Skip to content

Commit 4513119

Browse files
committed
Upgrade df version
Upgrade df version to 50.3.0 and arrow/parquet to 57.0.0
1 parent 0c35ecb commit 4513119

File tree

12 files changed

+1370
-598
lines changed

12 files changed

+1370
-598
lines changed

Cargo.lock

Lines changed: 1207 additions & 528 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,22 @@ build = "build.rs"
99

1010
[dependencies]
1111
# Arrow and DataFusion ecosystem
12-
arrow = "54.0.0"
13-
arrow-array = "54.0.0"
14-
arrow-flight = { version = "54.0.0", features = ["tls"] }
15-
arrow-ipc = { version = "54.0.0", features = ["zstd"] }
16-
arrow-json = "54.0.0"
17-
arrow-schema = { version = "54.0.0", features = ["serde"] }
18-
arrow-select = "54.0.0"
19-
datafusion = "45.0.0"
20-
object_store = { version = "0.11.2", features = [
12+
arrow = "57.0.0"
13+
arrow-array = "57.0.0"
14+
arrow-flight = { version = "57.0.0", features = ["tls-aws-lc","tls-native-roots"] }
15+
arrow-ipc = { version = "57.0.0", features = ["zstd"] }
16+
arrow-json = "57.0.0"
17+
arrow-schema = { version = "57.0.0", features = ["serde"] }
18+
arrow-select = "57.0.0"
19+
# datafusion = "50.3.0"
20+
datafusion = {git="https://github.com/apache/datafusion"}
21+
object_store = { version = "0.12.4", features = [
2122
"cloud",
2223
"aws",
2324
"azure",
2425
"gcp",
2526
] }
26-
parquet = "54.0.0"
27+
parquet = "57.0.0"
2728

2829
# Web server and HTTP-related
2930
actix-cors = "0.7.0"
@@ -33,8 +34,9 @@ actix-web-prometheus = { version = "0.1" }
3334
actix-web-static-files = "4.0"
3435
http = "0.2.7"
3536
http-auth-basic = "0.3.3"
36-
tonic = { version = "0.12.3", features = ["tls", "transport", "gzip", "zstd", "prost"] }
37-
tonic-web = "0.12.3"
37+
tonic = { version = "0.14.1", features = ["tls-aws-lc", "tls-native-roots", "transport", "gzip", "zstd"] }
38+
tonic-prost = "0.14.1"
39+
tonic-web = "0.14.1"
3840
tower-http = { version = "0.6.1", features = ["cors"] }
3941
url = "2.4.0"
4042

@@ -126,12 +128,13 @@ itertools = "0.14"
126128
once_cell = "1.20"
127129
rayon = "1.8"
128130
rand = "0.8.5"
129-
regex = "1.7.3"
131+
regex = "1.12.2"
130132
reqwest = { version = "0.11.27", default-features = false, features = [
131133
"rustls-tls",
132134
"json",
133135
"gzip",
134136
"brotli",
137+
"stream"
135138
] } # cannot update cause rustls is not latest `see rustls`
136139
semver = "1.0"
137140
static-files = "0.2"

src/catalog/column.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ impl TryFrom<&Statistics> for TypedStatistics {
156156
max: *stats.max_opt().expect("Int64 stats max not set"),
157157
}),
158158
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
159-
min: stats.min_opt().expect("Int96 stats min not set").to_i64(),
160-
max: stats.max_opt().expect("Int96 stats max not set").to_i64(),
159+
min: int96_to_i64_nanos(stats.min_opt().expect("Int96 stats min not set")),
160+
max: int96_to_i64_nanos(stats.max_opt().expect("Int96 stats max not set")),
161161
}),
162162
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
163163
min: *stats.min_opt().expect("Float32 stats min not set") as f64,
@@ -196,3 +196,21 @@ impl TryFrom<&Statistics> for TypedStatistics {
196196
Ok(res)
197197
}
198198
}
199+
200+
// Int96 is a deprecated timestamp format used by legacy Impala files
201+
// Convert to i64 nanoseconds since Unix epoch for statistics
202+
fn int96_to_i64_nanos(int96: &parquet::data_type::Int96) -> i64 {
203+
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; // Julian day for 1970-01-01
204+
const SECONDS_PER_DAY: i64 = 86_400;
205+
const NANOS_PER_SECOND: i64 = 1_000_000_000;
206+
207+
// Extract nanoseconds from first 8 bytes (little-endian)
208+
let nanos_of_day = int96.data()[0] as i64 | ((int96.data()[1] as i64) << 32);
209+
210+
// Extract Julian day from last 4 bytes
211+
let julian_day = int96.data()[2] as i64;
212+
213+
// Convert to nanoseconds since Unix epoch
214+
let days_since_epoch = julian_day - JULIAN_DAY_OF_EPOCH;
215+
days_since_epoch * SECONDS_PER_DAY * NANOS_PER_SECOND + nanos_of_day
216+
}

src/catalog/manifest.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
use std::collections::HashMap;
2020

2121
use itertools::Itertools;
22-
use parquet::{file::reader::FileReader, format::SortingColumn};
22+
use parquet::file::{
23+
metadata::{RowGroupMetaData, SortingColumn},
24+
reader::FileReader,
25+
};
2326

2427
use crate::metastore::metastore_traits::MetastoreObject;
2528

@@ -170,9 +173,7 @@ fn sort_order(
170173
sort_orders
171174
}
172175

173-
fn column_statistics(
174-
row_groups: &[parquet::file::metadata::RowGroupMetaData],
175-
) -> HashMap<String, Column> {
176+
fn column_statistics(row_groups: &[RowGroupMetaData]) -> HashMap<String, Column> {
176177
let mut columns: HashMap<String, Column> = HashMap::new();
177178
for row_group in row_groups {
178179
for col in row_group.columns() {

src/parseable/staging/reader.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ mod tests {
342342
types::Int64Type,
343343
};
344344
use arrow_ipc::writer::{
345-
DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, write_message,
345+
CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
346+
write_message,
346347
};
347348
use arrow_schema::{DataType, Field, Schema};
348349
use chrono::Utc;
@@ -433,6 +434,7 @@ mod tests {
433434
let options = IpcWriteOptions::default();
434435
let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
435436
let data_gen = IpcDataGenerator {};
437+
let mut compression_context = CompressionContext::default();
436438

437439
let mut buf = Vec::new();
438440
let rb1 = rb(1);
@@ -446,7 +448,12 @@ mod tests {
446448

447449
for i in (1..=3).cycle().skip(1).take(10000) {
448450
let (_, encoded_message) = data_gen
449-
.encoded_batch(&rb(i), &mut dictionary_tracker, &options)
451+
.encode(
452+
&rb(i),
453+
&mut dictionary_tracker,
454+
&options,
455+
&mut compression_context,
456+
)
450457
.unwrap();
451458
write_message(&mut buf, encoded_message, &options).unwrap();
452459
}

src/parseable/streams.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ use parquet::{
3535
arrow::ArrowWriter,
3636
basic::Encoding,
3737
file::{
38-
FOOTER_SIZE, properties::WriterProperties, reader::FileReader,
38+
FOOTER_SIZE, metadata::SortingColumn, properties::WriterProperties, reader::FileReader,
3939
serialized_reader::SerializedFileReader,
4040
},
41-
format::SortingColumn,
4241
schema::types::ColumnPath,
4342
};
4443
use relative_path::RelativePathBuf;

src/query/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ use actix_web::Either;
2424
use chrono::NaiveDateTime;
2525
use chrono::{DateTime, Duration, Utc};
2626
use datafusion::arrow::record_batch::RecordBatch;
27-
use datafusion::catalog::resolve_table_references;
2827
use datafusion::common::tree_node::Transformed;
29-
use datafusion::execution::disk_manager::DiskManagerConfig;
28+
use datafusion::execution::disk_manager::DiskManager;
3029
use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder};
3130
use datafusion::logical_expr::expr::Alias;
3231
use datafusion::logical_expr::{
3332
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
3433
};
3534
use datafusion::prelude::*;
3635
use datafusion::sql::parser::DFParser;
36+
use datafusion::sql::resolve::resolve_table_references;
3737
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
3838
use itertools::Itertools;
3939
use once_cell::sync::Lazy;
@@ -120,7 +120,7 @@ impl Query {
120120
fn create_session_state(storage: Arc<dyn ObjectStorageProvider>) -> SessionState {
121121
let runtime_config = storage
122122
.get_datafusion_runtime()
123-
.with_disk_manager(DiskManagerConfig::NewOs);
123+
.with_disk_manager_builder(DiskManager::builder());
124124
let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size {
125125
Some(size) => (size, 1.),
126126
None => {
@@ -231,6 +231,7 @@ impl Query {
231231
self.time_range.end.naive_utc(),
232232
);
233233
LogicalPlan::Explain(Explain {
234+
explain_format: plan.explain_format,
234235
verbose: plan.verbose,
235236
stringified_plans: vec![
236237
transformed

0 commit comments

Comments
 (0)