diff --git a/.gitignore b/.gitignore index cb64eaa..701dcbb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ local_settings.py seatable-python-runner/ seatable-python-runner.zip + +.python-version diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index e2d38cc..747b3b1 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -39,7 +39,6 @@ def __init__( org_id, script_name, context_data, - started_at, operate_from=None, ): self.dtable_uuid = dtable_uuid @@ -47,9 +46,17 @@ def __init__( self.org_id = org_id self.script_name = script_name self.context_data = context_data - self.started_at = started_at self.operate_from = operate_from + def get_info(self): + return { + "id": self.id, + "org_id": self.org_id, + "owner": self.owner, + "dtable_uuid": self.dtable_uuid, + "script_name": self.script_name, + } + def to_dict(self): from faas_scheduler.utils import datetime_to_isoformat_timestr @@ -61,7 +68,8 @@ def to_dict(self): "context_data": ( json.loads(self.context_data) if self.context_data else None ), - "started_at": datetime_to_isoformat_timestr(self.started_at), + "started_at": self.started_at + and datetime_to_isoformat_timestr(self.started_at), "finished_at": self.finished_at and datetime_to_isoformat_timestr(self.finished_at), "success": self.success, diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index f43af3d..4ab902c 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -2,12 +2,17 @@ import json import logging import requests -from datetime import datetime +from datetime import datetime, timedelta from uuid import UUID from tzlocal import get_localzone from sqlalchemy import desc, text -from faas_scheduler.models import ScriptLog +from faas_scheduler.models import ( + ScriptLog, + UserRunScriptStatistics, + OrgRunScriptStatistics, + DTableRunScriptStatistics, +) import sys @@ -75,7 +80,6 @@ def ping_starter(): return False -## triggered from scheduler.py to remove old script_logs def delete_log_after_days(db_session): clean_script_logs = ( "DELETE FROM `script_log` WHERE `started_at` < DATE_SUB(NOW(), INTERVAL %s DAY)" @@ -94,7 +98,6 @@ def delete_log_after_days(db_session): db_session.close() -## triggered from scheduler.py to remove old statistics def delete_statistics_after_days(db_session): tables = [ "dtable_run_script_statistics", @@ -172,6 +175,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): }, "context_data": context_data, "script_id": script_id, + "timeout": int(SUB_PROCESS_TIMEOUT), } headers = {"User-Agent": "python-scheduler/" + VERSION} logger.debug("I call starter at url %s", RUN_FUNC_URL) @@ -196,66 +200,140 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): return None -def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time): - if not spend_time: - return - username = owner - - # dtable_run_script_statistcis - sqls = [ - """ - INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES - (:dtable_uuid, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_count(db_session, dtable_uuid, owner, org_id): + run_date = datetime.today().strftime("%Y-%m-%d") + try: + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_count += 1 + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_count += 1 + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() + ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_count += 1 + org_stats.update_at = datetime.now() + db_session.commit() + except Exception as e: + logger.exception( + "update stats for org_id %s owner %s dtable %s run count error %s", + org_id, + owner, + dtable_uuid, + e, + ) - # org_run_script_statistics - if org_id and org_id != -1: - sqls += [ - """ - INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] - - # user_run_script_statistics - if "@seafile_group" not in username: - sqls += [ - """ - INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:username, :org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - org_id=:org_id, - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time): + run_date = datetime.today().strftime("%Y-%m-%d") try: - for sql in sqls: - db_session.execute( - text(sql), - { - "dtable_uuid": dtable_uuid, - "username": username, - "org_id": org_id, - "run_date": datetime.today(), - "spend_time": spend_time, - "update_at": datetime.now(), - }, + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_time += spend_time + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_time += spend_time + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_time += spend_time + org_stats.update_at = datetime.now() db_session.commit() except Exception as e: - logger.exception("update statistics sql error: %s", e) + logger.exception( + "update stats for org_id %s owner %s dtable %s run time error %s", + org_id, + owner, + dtable_uuid, + e, + ) # required to get "script logs" in dtable-web @@ -379,17 +457,21 @@ def add_script( org_id, script_name, context_data, - datetime.now(), operate_from, ) db_session.add(script) db_session.commit() + update_stats_run_count(db_session, dtable_uuid, owner, org_id) + return script -def update_script(db_session, script, success, return_code, output): - script.finished_at = datetime.now() +def update_script( + db_session, script, success, return_code, output, started_at, finished_at +): + script.started_at = started_at + script.finished_at = finished_at script.success = success script.return_code = return_code script.output = output @@ -414,17 +496,24 @@ def run_script( call_faas_func(script_url, temp_api_token, context_data, script_id=script_id) except Exception as e: logger.exception("Run script %d error: %s", script_id, e) + now = datetime.now() + hook_update_script(db_session, script_id, False, -1, "", now, 0) finally: db_session.close() return True -def hook_update_script(db_session, script_id, success, return_code, output, spend_time): +def hook_update_script( + db_session, script_id, success, return_code, output, started_at, spend_time +): script = db_session.query(ScriptLog).filter_by(id=script_id).first() if script: - update_script(db_session, script, success, return_code, output) - update_statistics( + finished_at = started_at + timedelta(seconds=spend_time) + update_script( + db_session, script, success, return_code, output, started_at, finished_at + ) + update_stats_run_time( db_session, script.dtable_uuid, script.owner, script.org_id, spend_time ) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index 77feb16..8d28adf 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -8,16 +8,12 @@ from datetime import datetime from flask import Flask, request, make_response from gevent.pywsgi import WSGIServer -from concurrent.futures import ThreadPoolExecutor from database import DBSession from faas_scheduler.utils import ( check_auth_token, - run_script, get_script, - add_script, get_run_script_statistics_by_month, - hook_update_script, can_run_task, get_run_scripts_count_monthly, ping_starter, @@ -26,6 +22,7 @@ uuid_str_to_32_chars, basic_log, ) +from scheduler import scheduler basic_log("scheduler.log") @@ -36,11 +33,11 @@ TIMEOUT_OUTPUT = ( "The script's running time exceeded the limit and the execution was aborted." ) +HOST = os.environ.get("PYTHON_SCHEDULER_BIND_HOST", "127.0.0.1") app = Flask(__name__) logger = logging.getLogger(__name__) -executor = ThreadPoolExecutor(max_workers=SCRIPT_WORKERS) @app.route("/ping/", methods=["GET"]) @@ -57,7 +54,7 @@ def ping(): # called from dtable-web to start the python run @app.route("/run-script/", methods=["POST"]) -def scripts_api(): +def run_script_api(): if not check_auth_token(request): return make_response(("Forbidden: the auth token is not correct.", 403)) @@ -74,8 +71,6 @@ def scripts_api(): context_data = data.get("context_data") owner = data.get("owner") org_id = data.get("org_id") - script_url = data.get("script_url") - temp_api_token = data.get("temp_api_token") scripts_running_limit = data.get("scripts_running_limit", -1) operate_from = data.get("operate_from", "manualy") if not dtable_uuid or not script_name or not owner: @@ -89,27 +84,17 @@ def scripts_api(): owner, org_id, db_session, scripts_running_limit=scripts_running_limit ): return make_response(("The number of runs exceeds the limit"), 400) - script = add_script( + script_log = scheduler.add( db_session, - dtable_uuid, - owner, + uuid_str_to_32_chars(dtable_uuid), org_id, + owner, script_name, context_data, operate_from, ) - logger.debug("lets call the starter to fire up the runner...") - executor.submit( - run_script, - script.id, - dtable_uuid, - script_name, - script_url, - temp_api_token, - context_data, - ) - return make_response(({"script_id": script.id}, 200)) + return make_response(({"script_id": script_log.id}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) @@ -119,7 +104,7 @@ def scripts_api(): # called from dtable-web to get the status of a specific run. @app.route("/run-script//", methods=["GET"]) -def script_api(script_id): +def get_script_api(script_id): if not check_auth_token(request): return make_response(("Forbidden: the auth token is not correct.", 403)) @@ -142,19 +127,12 @@ def script_api(script_id): script = get_script(db_session, script_id) if not script: return make_response(("Not found", 404)) - if dtable_uuid != script.dtable_uuid or script_name != script.script_name: + if ( + uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid + or script_name != script.script_name + ): return make_response(("Bad request", 400)) - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - now = datetime.now() - duration_seconds = (now - script.started_at).seconds - if duration_seconds > SUB_PROCESS_TIMEOUT: - script.success = False - script.return_code = -1 - script.finished_at = now - script.output = TIMEOUT_OUTPUT - db_session.commit() - return make_response(({"script": script.to_dict()}, 200)) except Exception as e: @@ -186,7 +164,9 @@ def task_logs_api(dtable_uuid, script_name): db_session = DBSession() try: - task_logs = list_task_logs(db_session, dtable_uuid, script_name, order_by) + task_logs = list_task_logs( + db_session, uuid_str_to_32_chars(dtable_uuid), script_name, order_by + ) count = task_logs.count() task_logs = task_logs[start:end] task_log_list = [task_log.to_dict() for task_log in task_logs] @@ -286,16 +266,21 @@ def record_script_result(): success = data.get("success", False) return_code = data.get("return_code") output = data.get("output") - spend_time = data.get("spend_time") + started_at = datetime.fromisoformat(data.get("started_at")) + spend_time = data.get("spend_time") or 0 script_id = data.get("script_id") - - db_session = DBSession() - # update script_log and run-time statistics + db_session = DBSession() try: if script_id: - hook_update_script( - db_session, script_id, success, return_code, output, spend_time + scheduler.script_done_callback( + db_session, + script_id, + success, + return_code, + output, + started_at, + spend_time, ) except Exception as e: @@ -386,5 +371,5 @@ def base_run_python_statistics(): if __name__ == "__main__": - http_server = WSGIServer(("127.0.0.1", 5055), app) + http_server = WSGIServer((HOST, 5055), app) http_server.serve_forever() diff --git a/scheduler/app/monitor.sh b/scheduler/app/monitor.sh index b5f6dde..079664f 100755 --- a/scheduler/app/monitor.sh +++ b/scheduler/app/monitor.sh @@ -39,24 +39,11 @@ function monitor_flask_server() { fi } -function monitor_scheduler() { - process_name="scheduler.py" - check_num=$(check_process $process_name) - if [ $check_num -eq 0 ]; then - log "Start $process_name" - sleep 0.2 - cd /opt/scheduler/ - python3 -u scheduler.py >> "${LOG_FILE}" 2>&1 & - sleep 0.2 - fi -} - log "Start Monitor" while [ 1 ]; do monitor_flask_server - monitor_scheduler sleep 30 done diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 7d6bfe1..5395c00 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -1,56 +1,93 @@ +import json +import logging import os -import gc import time -import logging -from threading import Thread +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timedelta +from threading import Lock, Thread from database import DBSession +from faas_scheduler.models import ScriptLog from faas_scheduler.utils import ( - check_and_set_tasks_timeout, + add_script, delete_log_after_days, delete_statistics_after_days, - basic_log, + run_script, + get_script_file, + hook_update_script, + check_and_set_tasks_timeout, ) -basic_log("scheduler.log") - -SUB_PROCESS_TIMEOUT = int( - os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15) -) # 15 minutes - logger = logging.getLogger(__name__) +SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) -class FAASTaskTimeoutSetter(Thread): +class Scheduelr: def __init__(self): - super(FAASTaskTimeoutSetter, self).__init__() - self.interval = 60 * 30 # every half an hour - - def run(self): - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - while True: - logger.info("Start automatic cleanup ...") - db_session = DBSession() - try: - check_and_set_tasks_timeout(db_session) - except Exception as e: - logger.exception("task cleaner error: %s", e) - finally: - db_session.close() - - # python garbage collection - logger.info("gc.collect: %s", str(gc.collect())) - - # remove old script_logs and statistics + self.executor = ThreadPoolExecutor() + + def add( + self, + db_session, + dtable_uuid, + org_id, + owner, + script_name, + context_data, + operate_from, + ): + script_log = add_script( + db_session, + dtable_uuid, + owner, + org_id, + script_name, + context_data, + operate_from, + ) + script_file_info = get_script_file( + script_log.dtable_uuid, script_log.script_name + ) + self.executor.submit( + run_script, + script_log.id, + dtable_uuid, + script_name, + script_file_info["script_url"], + script_file_info["temp_api_token"], + context_data, + ) + return script_log + + def script_done_callback( + self, + db_session, + script_id, + success, + return_code, + output, + started_at, + spend_time, + ): + hook_update_script( + db_session, script_id, success, return_code, output, started_at, spend_time + ) + + def statistic_cleaner(self): + while True: + db_session = DBSession() + try: delete_log_after_days(db_session) delete_statistics_after_days(db_session) + except Exception as e: + logger.exception(e) + finally: + db_session.close() + time.sleep(24 * 60 * 60) - # sleep - logger.info("Sleep for %d seconds ...", self.interval) - time.sleep(self.interval) + def start(self): + Thread(target=self.statistic_cleaner, daemon=True).start() -if __name__ == "__main__": - task_timeout_setter = FAASTaskTimeoutSetter() - task_timeout_setter.start() +scheduler = Scheduelr() diff --git a/scheduler/app/scheduler.sh b/scheduler/app/scheduler.sh index 27a2988..4feac61 100755 --- a/scheduler/app/scheduler.sh +++ b/scheduler/app/scheduler.sh @@ -8,7 +8,6 @@ fi function stop_server() { pkill -9 -f flask_server.py - pkill -9 -f scheduler.py pkill -9 -f monitor rm -f /opt/scheduler/pids/*.pid } @@ -32,9 +31,6 @@ function start_server() { python3 -u flask_server.py >> "${LOG_FILE}" 2>&1 & sleep 0.2 - python3 -u scheduler.py >> "${LOG_FILE}" 2>&1 & - sleep 0.2 - ./monitor.sh & #&>>/opt/scheduler/logs/monitor.log & diff --git a/starter/runner.py b/starter/runner.py index e4cb429..66f74e0 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -7,6 +7,7 @@ import time import ast import sys +from datetime import datetime from concurrent.futures import ThreadPoolExecutor from uuid import uuid4 @@ -22,6 +23,7 @@ LOG_LEVEL = os.environ.get("PYTHON_STARTER_LOG_LEVEL", "INFO") TIME_ZONE = os.environ.get("TIME_ZONE", "") PYTHON_RUNNER_IMAGE = os.environ.get("PYTHON_RUNNER_IMAGE") +IS_CHOWN_SCRIPT_DIR = os.environ.get("IS_CHOWN_SCRIPT_DIR", "true").lower() == "true" THREAD_COUNT = int(os.environ.get("PYTHON_STARTER_THREAD_COUNT", 10)) SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) # 15 mins @@ -70,6 +72,9 @@ SEATABLE_USER_UID = 1000 SEATABLE_USER_GID = 1000 +# bind host +HOST = os.environ.get("PYTHON_STARTER_BIND_HOST", "127.0.0.1") + def get_log_level(level): if level.lower() == "info": @@ -132,12 +137,15 @@ def to_python_bool(value): return value.lower() == "true" -def send_to_scheduler(success, return_code, output, spend_time, request_data): +def send_to_scheduler( + success, return_code, output, started_at, spend_time, request_data +): """ This function is used to send result of script to scheduler - success: whether script running successfully - return_code: return-code of subprocess - output: output of subprocess or error message + - started_at: start timestamp - spend_time: time subprocess took - request_data: data from request """ @@ -152,7 +160,8 @@ def send_to_scheduler(success, return_code, output, spend_time, request_data): "success": success, "return_code": return_code, "output": output, - "spend_time": spend_time, + "started_at": datetime.fromtimestamp(started_at).isoformat(), + "spend_time": spend_time or 0, } result_data.update( { @@ -186,9 +195,11 @@ def run_python(data): logging.info("New python run initalized... (v%s)", VERSION) + started_at = time.time() + script_url = data.get("script_url") if not script_url: - send_to_scheduler(False, None, "Script URL is missing", None, data) + send_to_scheduler(False, None, "Script URL is missing", started_at, None, data) return if ( to_python_bool(USE_ALTERNATIVE_FILE_SERVER_ROOT) @@ -228,11 +239,11 @@ def run_python(data): logging.error( "Failed to get script from %s, response: %s", script_url, resp ) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return except Exception as e: logging.error("Failed to get script from %s, error: %s", script_url, e) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return logging.debug("Generate temporary random folder directory") @@ -264,13 +275,16 @@ def run_python(data): return_code, output = None, "" # init output except Exception as e: logging.error("Failed to save script %s, error: %s", script_url, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return try: logging.debug("Fix ownership of %s", tmp_dir) - os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) + if IS_CHOWN_SCRIPT_DIR: + os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) except Exception as e: logging.error("Failed to chown %s, error: %s", tmp_dir, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return logging.debug("prepare the command to start the python runner") @@ -339,8 +353,6 @@ def run_python(data): command.append("run") # override command logging.debug("command: %s", command) - start_at = time.time() - logging.debug("try to start the python runner image") try: result = subprocess.run( @@ -371,6 +383,7 @@ def run_python(data): False, -1, "The script's running time exceeded the limit and the execution was aborted.", + started_at, DEFAULT_SUB_PROCESS_TIMEOUT, data, ) @@ -378,7 +391,7 @@ def run_python(data): except Exception as e: logging.exception(e) logging.error("Failed to run file %s error: %s", script_url, e) - send_to_scheduler(False, None, None, None, data) + send_to_scheduler(False, None, None, started_at, None, data) return else: logging.debug( @@ -388,7 +401,12 @@ def run_python(data): if os.path.isfile(output_file_path): if os.path.islink(output_file_path): send_to_scheduler( - False, -1, "Script invalid!", time.time() - start_at, data + False, + -1, + "Script invalid!", + started_at, + time.time() - started_at, + data, ) return with open(output_file_path, "r") as f: @@ -418,7 +436,7 @@ def run_python(data): except Exception as e: logging.warning("Fail to remove container error: %s", e) - spend_time = time.time() - start_at + spend_time = time.time() - started_at logging.info("python run finished successful. duration was: %s", spend_time) logging.debug( "send this to the scheduler. return_code: %s, output: %s, spend_time: %s, data: %s", @@ -427,7 +445,9 @@ def run_python(data): spend_time, data, ) - send_to_scheduler(return_code == 0, return_code, output, spend_time, data) + send_to_scheduler( + return_code == 0, return_code, output, started_at, spend_time, data + ) #################### @@ -459,4 +479,4 @@ def health_check(): if __name__ == "__main__": - app.run(port=8088, debug=False) + app.run(host=HOST, port=8088, debug=False)