-
Notifications
You must be signed in to change notification settings - Fork 0
Add support for replace in incremental scan #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple clarifications
| /// 1. Files to compact: Vec<String> of existing file names that are being compacted | ||
| /// 2. Target file: String name of the new compacted file | ||
| /// | ||
| /// Example: `Replace(vec!["file-a.parquet", "file-b.parquet"], "file-a-b-compacted.parquet")` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this how iceberg engines do it too? How do they retarget positional delete files to the file-a-b-compacted.parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what spark does is that essentially file-a-b-compacted.parquet will contain the records of file-a + file-b minus the positional deletes (and equality deletes). However, existing delete files of file-a and file-b remain in-place.
| } | ||
|
|
||
| Operation::Replace(files_to_compact, target_file) => { | ||
| // Replace operation: compact existing files into a new file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is so big, I think it'd be nice if we could pull out the whole replace_operation as another function that is called upon matching Operation::Replace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // Snapshot 1: Empty starting point | ||
| Operation::Add(vec![], "empty.parquet".to_string()), | ||
| // Snapshot 2: Add file-a with 3 rows | ||
| Operation::Add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some test with positional deletes before and after replace operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Adds support for
replaceoperations in snapshot histories for incremental scans.Even though replace operations logically keep data the same, we still report file additions and deletions, as their physical layout changes and files to which the rows belong change. This is necessary for incremental scan users who want to base change tracking off of file identifiers.