Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,17 +434,18 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
configuration,
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* partition_by */nullptr,
/* columns */ columns,
/* constraints */ ConstraintsDescription{},
/* partition_by */ nullptr,
context_copy,
/* comment */"",
/* comment */ "",
getFormatSettings(context_copy),
LoadingStrictnessLevel::CREATE,
getCatalog(),
/* if_not_exists*/true,
/* is_datalake_query*/true,
/* lazy_init */true);
/* if_not_exists */ true,
/* is_datalake_query */ true,
/* is_table_function */ true,
/* lazy_init */ true);
}

void DatabaseDataLake::dropTable( /// NOLINT
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ void IStorageCluster::read(
return;
}

updateConfigurationIfNeeded(context);

storage_snapshot->check(column_names);

const auto & settings = context->getSettingsRef();
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class IStorageCluster : public IStorage
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
}

virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}

private:
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);

Expand Down
16 changes: 16 additions & 0 deletions src/Storages/ObjectStorage/Local/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,20 @@ StorageObjectStorageQuerySettings StorageLocalConfiguration::getQuerySettings(co
.ignore_non_existent_file = false};
}

ASTPtr StorageLocalConfiguration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();

arguments->children.push_back(std::make_shared<ASTLiteral>(path.path));
if (getFormat() != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(getFormat()));
if (getStructure() != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(getStructure()));
if (getCompressionMethod() != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(getCompressionMethod()));

return arguments;
}


}
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/Local/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class StorageLocalConfiguration : public StorageObjectStorageConfiguration

void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }

ASTPtr createArgsWithAccessData() const override;

private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
Expand Down
19 changes: 19 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
std::shared_ptr<DataLake::ICatalog> catalog,
bool if_not_exists,
bool is_datalake_query,
bool is_table_function,
bool lazy_init)
: IStorageCluster(
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
Expand Down Expand Up @@ -145,6 +146,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
tryLogCurrentException(log_);
}

// For tables need to update configuration on each read
// because data can be changed after previous update
update_configuration_on_read_write = !is_table_function;

ColumnsDescription columns{columns_in_table_or_function_definition};
std::string sample_path;
if (need_resolve_columns_or_format)
Expand Down Expand Up @@ -295,6 +300,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
{"IcebergS3", "icebergS3"},
{"IcebergAzure", "icebergAzure"},
{"IcebergHDFS", "icebergHDFS"},
{"IcebergLocal", "icebergLocal"},
{"DeltaLake", "deltaLake"},
{"DeltaLakeS3", "deltaLakeS3"},
{"DeltaLakeAzure", "deltaLakeAzure"},
Expand Down Expand Up @@ -416,6 +422,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
{"icebergS3", "icebergS3Cluster"},
{"icebergAzure", "icebergAzureCluster"},
{"icebergHDFS", "icebergHDFSCluster"},
{"icebergLocal", "icebergLocalCluster"},
{"deltaLake", "deltaLakeCluster"},
{"deltaLakeS3", "deltaLakeS3Cluster"},
{"deltaLakeAzure", "deltaLakeAzureCluster"},
Expand Down Expand Up @@ -741,6 +748,18 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr
return configuration->getExternalMetadata();
}

void StorageObjectStorageCluster::updateConfigurationIfNeeded(ContextPtr context)
{
if (update_configuration_on_read_write)
{
configuration->update(
object_storage,
context,
/* if_not_updated_before */false,
/* check_consistent_with_previous_metadata */false);
}
}

void StorageObjectStorageCluster::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const
{
if (getClusterName(context).empty())
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class StorageObjectStorageCluster : public IStorageCluster
std::shared_ptr<DataLake::ICatalog> catalog,
bool if_not_exists,
bool is_datalake_query,
bool lazy_init = false);
bool is_table_function,
bool lazy_init);

std::string getName() const override;

Expand Down Expand Up @@ -154,6 +155,8 @@ class StorageObjectStorageCluster : public IStorageCluster
ContextPtr context,
bool async_insert) override;

void updateConfigurationIfNeeded(ContextPtr context) override;

/*
In case the table was created with `object_storage_cluster` setting,
modify the AST query object so that it uses the table function implementation
Expand All @@ -176,6 +179,7 @@ class StorageObjectStorageCluster : public IStorageCluster

/// non-clustered storage to fall back on pure realisation if needed
std::shared_ptr<StorageObjectStorage> pure_storage;
bool update_configuration_on_read_write;
};

}
7 changes: 7 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ struct IcebergHDFSClusterDefinition
static constexpr auto non_clustered_storage_engine_name = IcebergHDFSDefinition::storage_engine_name;
};

struct IcebergLocalClusterDefinition
{
static constexpr auto name = "icebergLocalCluster";
static constexpr auto storage_engine_name = "IcebergLocalCluster";
static constexpr auto non_clustered_storage_engine_name = IcebergLocalDefinition::storage_engine_name;
};

struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
args.mode,
configuration->getCatalog(context, args.query.attach),
args.query.if_not_exists,
/* is_datalake_query*/ false);
/* is_datalake_query */ false,
/* is_table_function */ false,
/* lazy_init */ false);
}

#endif
Expand Down
16 changes: 5 additions & 11 deletions src/TableFunctions/TableFunctionObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,11 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration, is_data_lake>::
/* comment */ String{},
/* format_settings */ std::nullopt, /// No format_settings
/* mode */ LoadingStrictnessLevel::CREATE,
configuration->getCatalog(context, /*attach*/ false),
configuration->getCatalog(context, /* attach */ false),
/* if_not_exists */ false,
/* is_datalake_query*/ false);
/* is_datalake_query */ false,
/* is_table_function */ true,
/* lazy_init */ false);

storage->startup();
return storage;
Expand Down Expand Up @@ -296,6 +298,7 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf

#if USE_AVRO
template class TableFunctionObjectStorage<IcebergClusterDefinition, StorageIcebergConfiguration, true>;
template class TableFunctionObjectStorage<IcebergLocalClusterDefinition, StorageLocalIcebergConfiguration, true>;
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down Expand Up @@ -334,13 +337,4 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
.allow_readonly = false});
}
#endif


void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
}
}
10 changes: 9 additions & 1 deletion src/TableFunctions/TableFunctionObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, Configuration, is_data_
/* mode */ LoadingStrictnessLevel::CREATE,
/* catalog*/ nullptr,
/* if_not_exists */ false,
/* is_datalake_query*/ false,
/* is_datalake_query */ false,
/* is_table_function */ true,
/* lazy_init */ true);
}

Expand Down Expand Up @@ -155,6 +156,13 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
.category = FunctionDocumentation::Category::TableFunction},
.allow_readonly = false});

factory.registerFunction<TableFunctionIcebergLocalCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on shared storage in parallel for many nodes in a specified cluster.)",
.examples{{IcebergLocalClusterDefinition::name, "SELECT * FROM icebergLocalCluster(cluster, filename, format, [,compression])", ""}},
.category = FunctionDocumentation::Category::TableFunction},
.allow_readonly = false});

# if USE_AWS_S3
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
Expand Down
1 change: 1 addition & 0 deletions src/TableFunctions/TableFunctionObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDe

#if USE_AVRO
using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster<IcebergClusterDefinition, StorageIcebergConfiguration, true>;
using TableFunctionIcebergLocalCluster = TableFunctionObjectStorageCluster<IcebergLocalClusterDefinition, StorageLocalIcebergConfiguration, true>;
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down
29 changes: 29 additions & 0 deletions src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ struct IcebergHDFSClusterFallbackDefinition
static constexpr auto storage_engine_cluster_name = "IcebergHDFSCluster";
};

struct IcebergLocalClusterFallbackDefinition
{
static constexpr auto name = "icebergLocal";
static constexpr auto storage_engine_name = "Local";
static constexpr auto storage_engine_cluster_name = "IcebergLocalCluster";
};

struct DeltaLakeClusterFallbackDefinition
{
static constexpr auto name = "deltaLake";
Expand Down Expand Up @@ -163,6 +170,7 @@ using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallba

#if USE_AVRO
using TableFunctionIcebergClusterFallback = TableFunctionObjectStorageClusterFallback<IcebergClusterFallbackDefinition, TableFunctionIcebergCluster>;
using TableFunctionIcebergLocalClusterFallback = TableFunctionObjectStorageClusterFallback<IcebergLocalClusterFallbackDefinition, TableFunctionIcebergLocalCluster>;
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down Expand Up @@ -286,6 +294,27 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
.allow_readonly = false
}
);

factory.registerFunction<TableFunctionIcebergLocalClusterFallback>(
{
.documentation = {
.description=R"(The table function can be used to read the Iceberg table stored on shared disk in parallel for many nodes in a specified cluster or from single node.)",
.examples{
{
"icebergLocal",
"SELECT * FROM icebergLocal(filename)", ""
},
{
"icebergLocal",
"SELECT * FROM icebergLocal(filename) "
"SETTINGS object_storage_cluster='cluster'", ""
},
},
.category = FunctionDocumentation::Category::TableFunction
},
.allow_readonly = false
}
);
#endif

#if USE_AVRO && USE_AWS_S3
Expand Down
1 change: 0 additions & 1 deletion src/TableFunctions/registerTableFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ void registerTableFunctions()
registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory);
registerTableFunctionObjectStorageClusterFallback(factory);
registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);

#if USE_YTSAURUS
Expand Down
1 change: 0 additions & 1 deletion src/TableFunctions/registerTableFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);

void registerTableFunctionTimeSeries(TableFunctionFactory & factory);
Expand Down
50 changes: 37 additions & 13 deletions tests/integration/helpers/iceberg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,22 +288,26 @@ def get_creation_expression(
)

elif storage_type == "local":
assert not run_on_cluster

if table_function:
if run_on_cluster:
assert table_function
return f"""
iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return (
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {if_not_exists_prefix} {table_name} {schema}
ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
{partition_by}
{settings_expression}
if table_function:
return f"""
iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
"""
)
else:
return (
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {if_not_exists_prefix} {table_name} {schema}
ENGINE=Iceberg{engine_part}({storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
{partition_by}
{settings_expression}
"""
)

else:
raise Exception(f"Unknown iceberg storage type: {storage_type}")
Expand Down Expand Up @@ -473,6 +477,17 @@ def default_upload_directory(
raise Exception(f"Unknown iceberg storage type: {storage_type}")


def additional_upload_directory(
started_cluster, node, storage_type, local_path, remote_path, **kwargs
):
if storage_type == "local":
return LocalUploader(started_cluster.instances[node]).upload_directory(
local_path, remote_path, **kwargs
)
else:
raise Exception(f"Unknown iceberg storage type for additional uploading: {storage_type}")


def default_download_directory(
started_cluster, storage_type, remote_path, local_path, **kwargs
):
Expand All @@ -485,7 +500,7 @@ def default_download_directory(


def execute_spark_query_general(
spark, started_cluster, storage_type: str, table_name: str, query: str
spark, started_cluster, storage_type: str, table_name: str, query: str, additional_nodes=[]
):
spark.sql(query)
default_upload_directory(
Expand All @@ -494,8 +509,17 @@ def execute_spark_query_general(
f"/iceberg_data/default/{table_name}/",
f"/iceberg_data/default/{table_name}/",
)
for node in additional_nodes:
additional_upload_directory(
started_cluster,
node,
storage_type,
f"/iceberg_data/default/{table_name}/",
f"/iceberg_data/default/{table_name}/",
)
return


def get_last_snapshot(path_to_table):
import json
import os
Expand Down
Loading
Loading