Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
3d4317a
perf: add etl task table
andhreljaKern Oct 24, 2025
c002a6e
perf(etl_task): add llm_config
andhreljaKern Oct 24, 2025
c12e193
perf: rename EtlTask class
andhreljaKern Oct 24, 2025
571073f
perf: add business objects
andhreljaKern Oct 24, 2025
6232a62
fix: optional file ids in etl task create
andhreljaKern Oct 24, 2025
2e7c924
perf: add file_path to etl_task
andhreljaKern Oct 24, 2025
d58550a
perf: file paths
andhreljaKern Oct 24, 2025
cce9ba4
perf: add tokenizer column
andhreljaKern Oct 25, 2025
2bea857
perf: add file_size col
andhreljaKern Oct 25, 2025
e6d425b
perf: add split config
andhreljaKern Oct 26, 2025
ac40aa9
perf: add split config to etl task
andhreljaKern Oct 26, 2025
bd79420
fix: set split types to chunk and shrink
andhreljaKern Oct 26, 2025
cec3c92
perf: update enum name
andhreljaKern Oct 26, 2025
ef216ee
perf: default ETLExtractorPDF to PDF2MD
andhreljaKern Oct 26, 2025
1e6dff4
perf: fkey alignment
andhreljaKern Oct 27, 2025
3cbab40
perf: add ETLTransformer
andhreljaKern Oct 27, 2025
ccfd2be
perf: remove deleted cols
andhreljaKern Oct 27, 2025
3dc3020
perf: add monitor.set_etl_task_to_failed
andhreljaKern Oct 28, 2025
2ddce6e
Merge branch 'cognition-etl-provider' of github.com:code-kern-ai/refi…
andhreljaKern Oct 28, 2025
1b39c44
perf: EnumKern
andhreljaKern Oct 28, 2025
c304bac
perf: add etl_task_id to integration records
andhreljaKern Oct 28, 2025
7923e19
perf: enum alignment
andhreljaKern Oct 29, 2025
57f9280
perf: add project update for integration
andhreljaKern Oct 29, 2025
0a58543
perf: etl and integration deletions
andhreljaKern Oct 29, 2025
9e069eb
fix: thread start in general
andhreljaKern Oct 29, 2025
99323bd
perf: add ETLExtractorPDF from_string
andhreljaKern Oct 29, 2025
3ed424f
perf: integration_objects cleanup
andhreljaKern Oct 29, 2025
18ab8e3
perf: etl_task.cache_config
andhreljaKern Oct 29, 2025
a3674ae
Merge branch 'dev' into cognition-etl-provider
andhreljaKern Oct 29, 2025
5549a61
Merge branch 'dev' into cognition-etl-provider
andhreljaKern Oct 30, 2025
298b517
perf: error print
andhreljaKern Oct 30, 2025
bf33020
Merge branch 'cognition-etl-provider' of github.com:code-kern-ai/refi…
andhreljaKern Oct 30, 2025
9a6149b
fix: overwrite etl_task_id fkey
andhreljaKern Oct 30, 2025
c77a031
perf: make get_or_create_etl_task a submodule function
andhreljaKern Oct 30, 2025
f5681d7
fix: get_or_create_etl_task args
andhreljaKern Oct 30, 2025
0396e82
perf: align to integrations
andhreljaKern Oct 30, 2025
7efb80f
chore: conflict resolution
andhreljaKern Oct 30, 2025
9c33c7e
perf: add ETLCacheKeys
andhreljaKern Oct 30, 2025
b7da826
Merge branch 'cognition-etl-provider' of https://github.com/code-kern…
andhreljaKern Oct 30, 2025
e103f2f
perf: add file-cache to ETLCacheKey enum
andhreljaKern Oct 30, 2025
293c149
perf: minor fixes
andhreljaKern Oct 30, 2025
2d91953
New table
JWittmeyer Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion business_objects/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def force_remove_and_refresh_session_by_id(session_id: str) -> bool:
if session_id not in session_lookup:
return False
# context vars cant be closed from a different context but we can work around it by using a thread (which creates a new context) with the same id
daemon.run_without_db_token(__close_in_context(session_id))
daemon.run_without_db_token(__close_in_context, session_id)
return True


Expand Down
21 changes: 21 additions & 0 deletions business_objects/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from submodules.model.models import TaskQueue, Organization
from submodules.model.util import prevent_sql_injection
from submodules.model.session import session
from submodules.model.global_objects import etl_task as etl_task_db_bo
from submodules.model.cognition_objects import (
macro as macro_db_bo,
markdown_file as markdown_file_db_bo,
Expand Down Expand Up @@ -220,6 +221,26 @@ def set_integration_task_to_failed(
)


def set_etl_task_to_failed(
id: str,
is_active: bool = False,
error_message: Optional[str] = None,
state: Optional[
enums.CognitionMarkdownFileState
] = enums.CognitionMarkdownFileState.FAILED,
with_commit: bool = True,
) -> None:
# argument `state` is a workaround for cognition-gateway/api/routes/integrations.delete_many
etl_task_db_bo.update(
id=id,
state=state,
finished_at=datetime.datetime.now(datetime.timezone.utc),
is_active=is_active,
error_message=error_message,
with_commit=with_commit,
)


def __select_running_information_source_payloads(
project_id: Optional[str] = None,
only_running: bool = False,
Expand Down
16 changes: 16 additions & 0 deletions cognition_objects/environment_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ def get_by_name_and_org_id(
)


def get_by_id_and_org_id(
org_id: str,
id: str,
) -> CognitionEnvironmentVariable:

return (
session.query(CognitionEnvironmentVariable)
.filter(
CognitionEnvironmentVariable.organization_id == org_id,
CognitionEnvironmentVariable.project_id == None,
CognitionEnvironmentVariable.id == id,
)
.first()
)


def get_dataset_env_var_value(
dataset_id: str, org_id: str, scope: Literal["extraction", "transformation"]
) -> Union[str, None]:
Expand Down
16 changes: 16 additions & 0 deletions cognition_objects/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from sqlalchemy import func
from sqlalchemy.orm.attributes import flag_modified


from ..business_objects import general
from ..integration_objects import manager as integration_manager_db_bo
from ..session import session
from ..models import CognitionIntegration, CognitionGroup
from ..enums import (
Expand Down Expand Up @@ -200,6 +202,7 @@ def create(

def update(
id: str,
project_id: Optional[str] = None,
updated_by: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
Expand All @@ -219,6 +222,8 @@ def update(
if not integration:
return None

if project_id is not None and integration.project_id is None:
integration.project_id = project_id
if updated_by is not None:
integration.updated_by = updated_by
if name is not None:
Expand Down Expand Up @@ -278,6 +283,16 @@ def execution_finished(id: str) -> bool:
def delete_many(
ids: List[str], delete_cognition_groups: bool = True, with_commit: bool = True
) -> None:
for id in ids:
integration_records, IntegrationModel = (
integration_manager_db_bo.get_all_by_integration_id(id)
)
integration_manager_db_bo.delete_many(
IntegrationModel,
ids=[rec.id for rec in integration_records],
with_commit=True,
)

(
session.query(CognitionIntegration)
.filter(CognitionIntegration.id.in_(ids))
Expand All @@ -289,6 +304,7 @@ def delete_many(
.filter(CognitionGroup.meta_data.op("->>")("integration_id").in_(ids))
.delete(synchronize_session=False)
)

general.flush_or_commit(with_commit)


Expand Down
18 changes: 17 additions & 1 deletion cognition_objects/markdown_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

from ..business_objects import general
from ..session import session
from ..models import CognitionMarkdownDataset, Project
from ..models import CognitionMarkdownDataset, CognitionMarkdownFile, Project
from ..enums import Tablenames, MarkdownFileCategoryOrigin
from ..util import prevent_sql_injection
from .markdown_file import delete_many as delete_many_md_files


def get(org_id: str, id: str) -> CognitionMarkdownDataset:
Expand Down Expand Up @@ -184,6 +185,21 @@ def delete_many(org_id: str, dataset_ids: List[str], with_commit: bool = True) -
),
).delete(synchronize_session=False)

md_file_ids = (
session.query(CognitionMarkdownFile.id)
.filter(
CognitionMarkdownFile.organization_id == org_id,
CognitionMarkdownFile.dataset_id.in_(dataset_ids),
)
.all()
)

delete_many_md_files(
org_id=org_id,
md_file_ids=[md_file_id for (md_file_id,) in md_file_ids],
with_commit=True,
)

session.query(CognitionMarkdownDataset).filter(
CognitionMarkdownDataset.organization_id == org_id,
CognitionMarkdownDataset.id.in_(dataset_ids),
Expand Down
35 changes: 31 additions & 4 deletions cognition_objects/markdown_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .. import enums
from ..business_objects import general
from ..session import session
from ..models import CognitionMarkdownFile
from ..models import CognitionMarkdownFile, EtlTask
from ..util import prevent_sql_injection


Expand All @@ -19,6 +19,17 @@ def get(org_id: str, md_file_id: str) -> CognitionMarkdownFile:
)


def get_by_etl_task_id(org_id: str, etl_task_id: str) -> CognitionMarkdownFile:
return (
session.query(CognitionMarkdownFile)
.filter(
CognitionMarkdownFile.organization_id == org_id,
CognitionMarkdownFile.etl_task_id == etl_task_id,
)
.first()
)


def get_enriched(org_id: str, md_file_id: str) -> Dict[str, Any]:
org_id = prevent_sql_injection(org_id, isinstance(org_id, str))
md_file_id = prevent_sql_injection(md_file_id, isinstance(org_id, str))
Expand Down Expand Up @@ -71,8 +82,12 @@ def __get_enriched_query(
)
else:
mf_select = "mf.*"
et_state = "et.state"
mf_state = "mf.state"

query = f"""SELECT {mf_select} FROM cognition.markdown_file mf
query = f"""SELECT {mf_select}, COALESCE({et_state}, {mf_state}) AS etl_state
FROM cognition.markdown_file mf
LEFT JOIN global.etl_task et ON mf.etl_task_id = et.id
"""
query += f"WHERE mf.organization_id = '{org_id}' {where_add}"
query += query_add
Expand Down Expand Up @@ -175,6 +190,7 @@ def update(
finished_at: Optional[datetime] = None,
error: Optional[str] = None,
meta_data: Optional[Dict[str, Any]] = None,
etl_task_id: Optional[Dict[str, Any]] = None,
overwrite_meta_data: bool = True,
with_commit: bool = True,
) -> CognitionMarkdownFile:
Expand All @@ -199,22 +215,33 @@ def update(
markdown_file.meta_data = meta_data
else:
markdown_file.meta_data = {**markdown_file.meta_data, **meta_data}
if etl_task_id is not None:
markdown_file.etl_task_id = etl_task_id
general.flush_or_commit(with_commit)

return markdown_file


def delete(org_id: str, md_file_id: str, with_commit: bool = True) -> None:
session.query(CognitionMarkdownFile).filter(
md_file = session.query(CognitionMarkdownFile).filter(
CognitionMarkdownFile.organization_id == org_id,
CognitionMarkdownFile.id == md_file_id,
)
session.query(EtlTask).filter(
EtlTask.organization_id == org_id, EtlTask.id == md_file.etl_task_id
).delete()
md_file.delete()
general.flush_or_commit(with_commit)


def delete_many(org_id: str, md_file_ids: List[str], with_commit: bool = True) -> None:
session.query(CognitionMarkdownFile).filter(
md_files = session.query(CognitionMarkdownFile).filter(
CognitionMarkdownFile.organization_id == org_id,
CognitionMarkdownFile.id.in_(md_file_ids),
)
session.query(EtlTask).filter(
EtlTask.organization_id == org_id,
EtlTask.id.in_([mf.etl_task_id for mf in md_files]),
).delete(synchronize_session=False)
md_files.delete(synchronize_session=False)
general.flush_or_commit(with_commit)
116 changes: 101 additions & 15 deletions enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@
from typing import Any


class EnumKern(Enum):
@classmethod
def all(cls):
return [e.value for e in cls]

@classmethod
def from_string(cls, value: str):
changed_value = value.upper().replace(" ", "_").replace("-", "_")
for member in cls:
if member.value == changed_value:
return member
print(f"ERROR: unknown enum {cls.__name__}: {value}", flush=True)
raise ValueError(f"Unknown enum {cls.__name__}: {value}")


class DataTypes(Enum):
INTEGER = "INTEGER"
FLOAT = "FLOAT"
Expand Down Expand Up @@ -178,6 +193,8 @@ class Tablenames(Enum):
ADMIN_QUERY_MESSAGE_SUMMARY = "admin_query_message_summary"
RELEASE_NOTIFICATION = "release_notification"
TIMED_EXECUTIONS = "timed_executions"
ETL_TASK = "etl_task"
ETL_CONFIG_PRESET = "etl_config_preset"

def snake_case_to_pascal_case(self):
# the type name (written in PascalCase) of a table is needed to create backrefs
Expand Down Expand Up @@ -470,22 +487,18 @@ class TokenScope(Enum):
READ = "READ"
READ_WRITE = "READ_WRITE"

def all():
return [
TokenScope.READ.value,
TokenScope.READ_WRITE.value,
]
@classmethod
def all(cls):
return [e.value for e in cls]


class TokenSubject(Enum):
PROJECT = Tablenames.PROJECT.value.upper()
MARKDOWN_DATASET = Tablenames.MARKDOWN_DATASET.value.upper()

def all():
return [
TokenSubject.PROJECT.value,
TokenSubject.MARKDOWN_DATASET.value,
]
@classmethod
def all(cls):
return [e.value for e in cls]


class TokenizationTaskTypes(Enum):
Expand Down Expand Up @@ -517,6 +530,7 @@ class TaskType(Enum):
RUN_COGNITION_MACRO = "RUN_COGNITION_MACRO"
PARSE_COGNITION_FILE = "PARSE_COGNITION_FILE"
EXECUTE_INTEGRATION = "EXECUTE_INTEGRATION"
EXECUTE_ETL = "EXECUTE_ETL"


class TaskQueueAction(Enum):
Expand Down Expand Up @@ -807,11 +821,9 @@ class MacroType(Enum):
DOCUMENT_MESSAGE_QUEUE = "DOCUMENT_MESSAGE_QUEUE"
FOLDER_MESSAGE_QUEUE = "FOLDER_MESSAGE_QUEUE"

def all():
return [
MacroType.DOCUMENT_MESSAGE_QUEUE.value,
MacroType.FOLDER_MESSAGE_QUEUE.value,
]
@classmethod
def all(cls):
return [e.value for e in cls]


# currently only one option, but could be extended in the future
Expand Down Expand Up @@ -1017,3 +1029,77 @@ class MessageInitiationType(Enum):

class TimedExecutionKey(Enum):
LAST_RESET_USER_MESSAGE_COUNT = "LAST_RESET_USER_MESSAGE_COUNT"


class ETLSplitStrategy(EnumKern):
CHUNK = "CHUNK"
SHRINK = "SHRINK"


class ETLFileType(Enum):
PDF = "PDF"
WORD = "WORD"
MD = "MD"

@classmethod
def from_string(cls, value: str):
changed_value = value.upper().replace(" ", "_").replace("-", "_")
for member in cls:
if member.value == changed_value:
return member
print(
f"WARNING: unknown enum {cls.__name__}: {value}, defaulting to {cls.__name__}.MD",
flush=True,
)
return cls.MD


class ETLExtractorMD(EnumKern):
FILESYSTEM = "FILESYSTEM"


class ETLExtractorPDF(Enum):
VISION = "VISION"
AZURE_DI = "AZURE_DI"
PDF2MD = "PDF2MD"

@classmethod
def from_string(cls, value: str):
changed_value = value.upper().replace(" ", "_").replace("-", "_")
for member in cls:
if member.value == changed_value:
return member
if changed_value == "PDF2MARKDOWN":
return cls.PDF2MD
return cls.VISION


class ETLExtractorWord(EnumKern):
FILESYSTEM = "FILESYSTEM"


class ETLExtractor:
MD = ETLExtractorMD
PDF = ETLExtractorPDF
WORD = ETLExtractorWord

@classmethod
def from_string(cls, value: str):
changed_value = value.upper().replace(" ", "_").replace("-", "_")
for member in cls:
if member.name == changed_value:
return member
raise ValueError(f"ERROR: Unknown enum {cls.__name__}: {value}")


class ETLTransformer(EnumKern):
SUMMARIZE = "SUMMARIZE"
CLEANSE = "CLEANSE"
TEXT_TO_TABLE = "TEXT_TO_TABLE"


class ETLCacheKeys(EnumKern):
FILE_CACHE = "use_file_cache"
EXTRACTION = "use_extraction_cache"
SPLITTING = "use_splitting_cache"
TRANSFORMATION = "use_transformation_cache"
Loading