Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
bool first_stage_,
bool second_stage_,
bool only_types,
const FilterDAGInfoPtr & filter_info_,
const FilterDAGInfoPtr & row_policy_info_,
const FilterDAGInfoPtr & additional_filter,
const Block & source_header)
: first_stage(first_stage_)
Expand Down Expand Up @@ -2031,10 +2031,10 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
columns_for_additional_filter.begin(), columns_for_additional_filter.end());
}

if (storage && filter_info_)
if (storage && row_policy_info_)
{
filter_info = filter_info_;
filter_info->do_remove_column = true;
row_policy_info = row_policy_info_;
row_policy_info->do_remove_column = true;
}

if (prewhere_dag_and_flags = query_analyzer.appendPrewhere(chain, !first_stage); prewhere_dag_and_flags)
Expand Down Expand Up @@ -2373,9 +2373,9 @@ std::string ExpressionAnalysisResult::dump() const
ss << "prewhere_info " << prewhere_info->dump() << "\n";
}

if (filter_info)
if (row_policy_info)
{
ss << "filter_info " << filter_info->dump() << "\n";
ss << "filter_info " << row_policy_info->dump() << "\n";
}

if (before_aggregation)
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ struct ExpressionAnalysisResult
NameSet columns_to_remove_after_prewhere;

PrewhereInfoPtr prewhere_info;
FilterDAGInfoPtr filter_info;
FilterDAGInfoPtr row_policy_info;
ConstantFilterDescription prewhere_constant_filter_description;
ConstantFilterDescription where_constant_filter_description;
/// Actions by every element of ORDER BY
Expand All @@ -285,12 +285,12 @@ struct ExpressionAnalysisResult
bool first_stage,
bool second_stage,
bool only_types,
const FilterDAGInfoPtr & filter_info,
const FilterDAGInfoPtr & row_policy_info,
const FilterDAGInfoPtr & additional_filter, /// for setting additional_filters
const Block & source_header);

/// Filter for row-level security.
bool hasFilter() const { return filter_info.get(); }
bool hasRowPolicyFilter() const { return row_policy_info.get(); }

bool hasJoin() const { return join.get(); }
bool hasPrewhere() const { return prewhere_info.get(); }
Expand Down
126 changes: 42 additions & 84 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Fix source_header for filter actions.
if (row_policy_filter && !row_policy_filter->empty())
{
filter_info = generateFilterActions(
row_policy_info = generateFilterActions(
table_id, row_policy_filter->expression, context, storage, storage_snapshot, metadata_snapshot, required_columns,
prepared_sets);

Expand Down Expand Up @@ -1052,8 +1052,6 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
max_rows = max_rows ? std::min(max_rows, settings[Setting::max_rows_to_read].value) : settings[Setting::max_rows_to_read];
query_info_copy.trivial_limit = max_rows;

/// Apply filters to prewhere and add them to the query_info so we can filter out parts efficiently during row estimation
applyFiltersToPrewhereInAnalysis(analysis_copy);
if (analysis_copy.prewhere_info)
{
query_info_copy.prewhere_info = analysis_copy.prewhere_info;
Expand All @@ -1069,13 +1067,13 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
= query_info_copy.prewhere_info->prewhere_actions.findInOutputs(query_info_copy.prewhere_info->prewhere_column_name);
added_filter_nodes.nodes.push_back(&node);
}
}

if (query_info_copy.prewhere_info->row_level_filter)
{
const auto & node
= query_info_copy.prewhere_info->row_level_filter->findInOutputs(query_info_copy.prewhere_info->row_level_column_name);
added_filter_nodes.nodes.push_back(&node);
}
if (query_info_copy.row_level_filter)
{
const auto & node
= query_info_copy.row_level_filter->actions.findInOutputs(query_info_copy.row_level_filter->column_name);
added_filter_nodes.nodes.push_back(&node);
}

if (auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes))
Expand Down Expand Up @@ -1178,7 +1176,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
&& options.to_stage > QueryProcessingStage::WithMergeableState;

analysis_result = ExpressionAnalysisResult(
*query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, filter_info, additional_filter_info, source_header);
*query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, row_policy_info, additional_filter_info, source_header);

if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
{
Expand Down Expand Up @@ -1621,32 +1619,20 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
auto read_nothing = std::make_unique<ReadNothingStep>(source_header);
query_plan.addStep(std::move(read_nothing));

if (expressions.filter_info)
if (expressions.row_policy_info)
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentHeader(),
expressions.filter_info->actions.clone(),
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
expressions.row_policy_info->actions.clone(),
expressions.row_policy_info->column_name,
expressions.row_policy_info->do_remove_column);

row_level_security_step->setStepDescription("Row-level security filter");
query_plan.addStep(std::move(row_level_security_step));
}

if (expressions.prewhere_info)
{
if (expressions.prewhere_info->row_level_filter)
{
auto row_level_filter_step = std::make_unique<FilterStep>(
query_plan.getCurrentHeader(),
expressions.prewhere_info->row_level_filter->clone(),
expressions.prewhere_info->row_level_column_name,
true);

row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)");
query_plan.addStep(std::move(row_level_filter_step));
}

auto prewhere_step = std::make_unique<FilterStep>(
query_plan.getCurrentHeader(),
expressions.prewhere_info->prewhere_actions.clone(),
Expand Down Expand Up @@ -1748,13 +1734,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
// If there is a storage that supports prewhere, this will always be nullptr
// Thus, we don't actually need to check if projection is active.
if (expressions.filter_info)
if (expressions.row_policy_info && !(!input_pipe && storage && storage->supportsPrewhere()))
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentHeader(),
expressions.filter_info->actions.clone(),
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
expressions.row_policy_info->actions.clone(),
expressions.row_policy_info->column_name,
expressions.row_policy_info->do_remove_column);

row_level_security_step->setStepDescription("Row-level security filter");
query_plan.addStep(std::move(row_level_security_step));
Expand Down Expand Up @@ -2194,21 +2180,20 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, c
{
Pipe pipe(std::make_shared<NullSource>(source_header));

if (query_info.row_level_filter)
{
auto row_level_actions = std::make_shared<ExpressionActions>(query_info.row_level_filter->actions.clone());
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(header,
row_level_actions,
query_info.row_level_filter->column_name,
query_info.row_level_filter->do_remove_column);
});
}
if (query_info.prewhere_info)
{
auto & prewhere_info = *query_info.prewhere_info;

if (prewhere_info.row_level_filter)
{
auto row_level_actions = std::make_shared<ExpressionActions>(prewhere_info.row_level_filter->clone());
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(header,
row_level_actions,
prewhere_info.row_level_column_name, true);
});
}

auto filter_actions = std::make_shared<ExpressionActions>(prewhere_info.prewhere_actions.clone());
pipe.addSimpleTransform([&](const Block & header)
{
Expand Down Expand Up @@ -2247,38 +2232,9 @@ bool InterpreterSelectQuery::shouldMoveToPrewhere() const
return settings[Setting::optimize_move_to_prewhere] && (!query.final() || settings[Setting::optimize_move_to_prewhere_if_final]);
}

/// Note that this is const and accepts the analysis ref to be able to use it to do analysis for parallel replicas
/// without affecting the final analysis multiple times
void InterpreterSelectQuery::applyFiltersToPrewhereInAnalysis(ExpressionAnalysisResult & analysis) const
{
if (!analysis.filter_info)
return;

if (!analysis.prewhere_info)
{
const bool does_storage_support_prewhere = !input_pipe && storage && storage->supportsPrewhere();
if (does_storage_support_prewhere && shouldMoveToPrewhere())
{
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.
analysis.prewhere_info = std::make_shared<PrewhereInfo>(std::move(analysis.filter_info->actions), analysis.filter_info->column_name);
analysis.prewhere_info->remove_prewhere_column = std::move(analysis.filter_info->do_remove_column);
analysis.prewhere_info->need_filter = true;
analysis.filter_info = nullptr;
}
}
else
{
/// Add row level security actions to prewhere.
analysis.prewhere_info->row_level_filter = std::move(analysis.filter_info->actions);
analysis.prewhere_info->row_level_column_name = std::move(analysis.filter_info->column_name);
analysis.filter_info = nullptr;
}
}


void InterpreterSelectQuery::addPrewhereAliasActions()
{
applyFiltersToPrewhereInAnalysis(analysis_result);
auto & row_level_filter = analysis_result.row_policy_info;
auto & prewhere_info = analysis_result.prewhere_info;
auto & columns_to_remove_after_prewhere = analysis_result.columns_to_remove_after_prewhere;

Expand All @@ -2305,12 +2261,11 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions.getRequiredColumns().getNames();
columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());

if (prewhere_info->row_level_filter)
{
auto row_level_required_columns = prewhere_info->row_level_filter->getRequiredColumns().getNames();
columns.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}
}
if (row_level_filter)
{
auto row_level_required_columns = row_level_filter->actions.getRequiredColumns().getNames();
columns.insert(row_level_required_columns.begin(), row_level_required_columns.end());
}

return columns;
Expand Down Expand Up @@ -2468,13 +2423,15 @@ std::optional<UInt64> InterpreterSelectQuery::getTrivialCount(UInt64 allow_exper

// It's possible to optimize count() given only partition predicates
ActionsDAG::NodeRawConstPtrs filter_nodes;
if (analysis_result.hasRowPolicyFilter())
{
auto & row_level_filter = analysis_result.row_policy_info;
filter_nodes.push_back(&row_level_filter->actions.findInOutputs(row_level_filter->column_name));
}
if (analysis_result.hasPrewhere())
{
auto & prewhere_info = analysis_result.prewhere_info;
filter_nodes.push_back(&prewhere_info->prewhere_actions.findInOutputs(prewhere_info->prewhere_column_name));

if (prewhere_info->row_level_filter)
filter_nodes.push_back(&prewhere_info->row_level_filter->findInOutputs(prewhere_info->row_level_column_name));
}
if (analysis_result.hasWhere())
{
Expand Down Expand Up @@ -2665,10 +2622,11 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (max_streams == 0)
max_streams = 1;

auto & prewhere_info = analysis_result.prewhere_info;
if (analysis_result.row_policy_info && (!input_pipe && storage && storage->supportsPrewhere()))
query_info.row_level_filter = analysis_result.row_policy_info;

if (prewhere_info)
query_info.prewhere_info = prewhere_info;
if (analysis_result.prewhere_info)
query_info.prewhere_info = analysis_result.prewhere_info;

bool optimize_read_in_order = analysis_result.optimize_read_in_order;
bool optimize_aggregation_in_order = analysis_result.optimize_read_in_order && !query_analyzer->useGroupingSetKey();
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
ExpressionAnalysisResult analysis_result;
/// For row-level security.
RowPolicyFilterPtr row_policy_filter;
FilterDAGInfoPtr filter_info;
FilterDAGInfoPtr row_policy_info;

/// For additional_filter setting.
FilterDAGInfoPtr additional_filter_info;
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/getHeaderForProcessingStage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Block getHeaderForProcessingStage(
case QueryProcessingStage::FetchColumns:
{
Block header = storage_snapshot->getSampleBlockForColumns(column_names);
header = SourceStepWithFilter::applyPrewhereActions(header, query_info.prewhere_info);
header = SourceStepWithFilter::applyPrewhereActions(header, query_info.row_level_filter, query_info.prewhere_info);
return header;
}
case QueryProcessingStage::WithMergeableState:
Expand Down
Loading
Loading