Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/translate/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::{
},
plan::{Aggregate, Distinctness, SelectPlan, TableReferences},
result_row::emit_select_result,
subquery::emit_non_from_clause_subquery,
};

/// Emits the bytecode for processing an aggregate without a GROUP BY clause.
Expand Down Expand Up @@ -125,6 +126,21 @@ pub fn emit_ungrouped_aggregation<'a>(
}
}
}
// Emit deferred correlated subqueries so that e.g. (SELECT COUNT(*) FROM t2 WHERE
// t2.a = t1.a) returns 0 instead of NULL when the outer loop produced no rows.
// Cursors are in NullRow state, so correlated column refs resolve to NULL and
// COUNT(*) correctly returns 0.
for (subquery_plan, query_type, is_correlated) in
t_ctx.deferred_ungrouped_agg_subqueries.drain(..)
{
emit_non_from_clause_subquery(
program,
&t_ctx.resolver,
*subquery_plan,
&query_type,
is_correlated,
)?;
}
// Evaluate non-aggregate columns now (with cursor in invalid state, columns return NULL)
// Must use no_constant_opt to prevent constant hoisting which would place the label
// after the hoisted constants, causing infinite loops in compound selects.
Expand Down
35 changes: 34 additions & 1 deletion core/translate/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::translate::fkeys::{
};
use crate::translate::plan::{
DeletePlan, EphemeralRowidMode, EvalAt, IndexMethodQuery, JoinedTable, NonFromClauseSubquery,
Plan, QueryDestination, ResultSetColumn, Search,
Plan, QueryDestination, ResultSetColumn, Search, SubqueryState,
};
use crate::translate::planner::ROWID_STRS;
use crate::translate::planner::{table_mask_from_expr, TableMask};
Expand Down Expand Up @@ -434,6 +434,10 @@ pub struct TranslateCtx<'a> {
pub cdc_cursor_id: Option<usize>,
pub meta_window: Option<WindowMetadata<'a>>,
pub unsafe_testing: bool,
/// Cloned subquery plans for correlated subqueries in ungrouped aggregate queries.
/// When no rows match the outer loop, these are emitted in the fallback path
/// so that e.g. COUNT(*) correctly returns 0 instead of NULL.
pub deferred_ungrouped_agg_subqueries: Vec<(Box<SelectPlan>, SubqueryType, bool)>,
}

impl<'a> TranslateCtx<'a> {
Expand Down Expand Up @@ -463,6 +467,7 @@ impl<'a> TranslateCtx<'a> {
cdc_cursor_id: None,
meta_window: None,
unsafe_testing,
deferred_ungrouped_agg_subqueries: Vec::new(),
}
}
}
Expand Down Expand Up @@ -1422,6 +1427,34 @@ pub fn emit_query<'a>(
return Ok(t_ctx.reg_result_cols_start.unwrap());
}

// For ungrouped aggregates with correlated subqueries in non-aggregate result columns,
// clone the subquery plans before open_loop consumes them. If the outer loop produces
// no rows, we re-emit these subqueries in the fallback path so that e.g.
// (SELECT COUNT(*) FROM t2 WHERE t2.a = t1.a) returns 0 instead of NULL.
// Skip when contains_constant_false_condition: the Goto skips cursor initialization,
// so deferred subqueries would panic accessing unopened cursors.
if has_ungrouped_nonagg_cols && !plan.contains_constant_false_condition {
for subquery in plan.non_from_clause_subqueries.iter() {
if !subquery.correlated || subquery.has_been_evaluated() {
continue;
}
let eval_at = subquery.get_eval_at(&plan.join_order, Some(&plan.table_references))?;
if !matches!(eval_at, EvalAt::Loop(_)) {
continue;
}
if let SubqueryState::Unevaluated {
plan: Some(subquery_plan),
} = &subquery.state
{
t_ctx.deferred_ungrouped_agg_subqueries.push((
subquery_plan.clone(),
subquery.query_type.clone(),
subquery.correlated,
));
}
}
}

// Set up main query execution loop
open_loop(
program,
Expand Down
3 changes: 3 additions & 0 deletions core/translate/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ pub fn emit_from_clause_subquery(
materialized_build_inputs: HashMap::default(),
hash_table_contexts: HashMap::default(),
unsafe_testing: t_ctx.unsafe_testing,
deferred_ungrouped_agg_subqueries: Vec::new(),
};
emit_query(program, select_plan, &mut metadata)?
}
Expand Down Expand Up @@ -1341,6 +1342,7 @@ fn emit_materialized_cte(
materialized_build_inputs: HashMap::default(),
hash_table_contexts: HashMap::default(),
unsafe_testing: t_ctx.unsafe_testing,
deferred_ungrouped_agg_subqueries: Vec::new(),
};
emit_query(program, select_plan, &mut metadata)?;
}
Expand Down Expand Up @@ -1432,6 +1434,7 @@ fn emit_indexed_materialized_subquery(
materialized_build_inputs: HashMap::default(),
hash_table_contexts: HashMap::default(),
unsafe_testing: t_ctx.unsafe_testing,
deferred_ungrouped_agg_subqueries: Vec::new(),
};
emit_query(program, select_plan, &mut metadata)?;
}
Expand Down
52 changes: 52 additions & 0 deletions sql_generation/generation/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,58 @@ impl Arbitrary for SelectFree {

impl Arbitrary for Select {
fn arbitrary<R: Rng + ?Sized, C: GenerationContext>(rng: &mut R, env: &C) -> Self {
// Generate a correlated subquery with COUNT(*) when 2+ tables exist.
// Disabled for now: triggers a pre-existing cursor bug ("cursor id 1 is None")
// in the Turso runtime when correlated subqueries run on connections that have
// previously executed DDL. Enable once that bug is fixed.
// See: Select::count_with_correlated_subquery() and Predicate::count_star()
if env.tables().len() >= 2 && rng.random_bool(0.0) {
let tables = env.tables();
let outer_table = pick(tables, rng);
let inner_candidates: Vec<&Table> = tables
.iter()
.filter(|t| t.name != outer_table.name)
.collect();
let inner_table = *pick(&inner_candidates, rng);

// Try to find a shared column name for correlation
let outer_col_names: IndexSet<&str> = outer_table
.columns
.iter()
.map(|c| c.name.as_str())
.collect();
let shared: Vec<&str> = inner_table
.columns
.iter()
.map(|c| c.name.as_str())
.filter(|n| outer_col_names.contains(n))
.collect();

let (outer_corr_col, inner_corr_col) = if !shared.is_empty() {
let col = (*pick(&shared, rng)).to_owned();
(col.clone(), col)
} else {
let oc = pick(&outer_table.columns, rng).name.clone();
let ic = pick(&inner_table.columns, rng).name.clone();
(oc, ic)
};

// Use WHERE TRUE to avoid generating constant-false conditions (e.g. NOT <literal>)
// that the optimizer folds into contains_constant_false_condition, which skips
// cursor initialization and hits a different (pre-existing) code path.
// The correlated subquery bug is exercised via data: when correlation columns
// don't match between tables, the inner COUNT(*) must return 0 not NULL.
let where_clause = Predicate::true_();

return Select::count_with_correlated_subquery(
outer_table.name.clone(),
inner_table.name.clone(),
outer_corr_col,
inner_corr_col,
where_clause,
);
}

// Generate a number of selects based on the query size
let opts = &env.opts().query.select;
let num_compound_selects = opts.compound_selects
Expand Down
34 changes: 34 additions & 0 deletions sql_generation/model/query/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use turso_parser::ast::{
self,
fmt::{BlankContext, ToTokens},
FunctionTail,
};

use crate::model::table::{SimValue, Table, TableContext};
Expand Down Expand Up @@ -83,6 +84,39 @@ impl Predicate {
Self(ast::Expr::Literal(val.into()))
}

/// Create a COUNT(*) aggregate expression
pub fn count_star() -> Self {
Self(ast::Expr::FunctionCallStar {
name: ast::Name::from_string("COUNT"),
filter_over: FunctionTail {
filter_clause: None,
over_clause: None,
},
})
}

/// Create a scalar subquery expression
pub fn subquery(select: super::select::Select) -> Self {
Self(ast::Expr::Subquery(select.to_sql_ast()))
}

/// Create a qualified column reference (e.g. "table.column" or "db.table.column")
pub fn qualified_column(table: &str, column: &str) -> Self {
// Handle attached-database prefixed table names like "aux0.tablename"
if let Some((db, tbl)) = table.split_once('.') {
Self(ast::Expr::DoublyQualified(
ast::Name::from_string(db),
ast::Name::from_string(tbl),
ast::Name::from_string(column),
))
} else {
Self(ast::Expr::Qualified(
ast::Name::from_string(table),
ast::Name::from_string(column),
))
}
}

pub fn parens(self) -> Self {
let expr = ast::Expr::Parenthesized(vec![Box::new(self.0)]);
Self(expr)
Expand Down
93 changes: 88 additions & 5 deletions sql_generation/model/query/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,48 @@ impl Select {
}
}

/// Build a query like:
/// SELECT COUNT(*), (SELECT COUNT(*) FROM inner_table WHERE inner_table.col = outer_table.col)
/// FROM outer_table WHERE where_clause
pub fn count_with_correlated_subquery(
outer_table: String,
inner_table: String,
outer_corr_col: String,
inner_corr_col: String,
where_clause: Predicate,
) -> Self {
let inner_select = Select::single(
inner_table.clone(),
vec![ResultColumn::Expr(Predicate::count_star())],
Predicate::eq(
Predicate::qualified_column(&inner_table, &inner_corr_col),
Predicate::qualified_column(&outer_table, &outer_corr_col),
),
None,
Distinctness::All,
);

Select {
body: SelectBody {
select: Box::new(SelectInner {
distinctness: Distinctness::All,
columns: vec![
ResultColumn::Expr(Predicate::count_star()),
ResultColumn::Expr(Predicate::subquery(inner_select)),
],
from: Some(FromClause {
table: SelectTable::Table(outer_table),
joins: Vec::new(),
}),
where_clause,
order_by: None,
}),
compounds: Vec::new(),
},
limit: None,
}
}

pub fn compound(left: Select, right: Select, operator: CompoundOperator) -> Self {
let mut body = left.body;
body.compounds.push(CompoundSelect {
Expand All @@ -107,13 +149,20 @@ impl Select {
}

pub fn dependencies(&self) -> IndexSet<String> {
if self.body.select.from.is_none() {
return IndexSet::new();
}
let from = self.body.select.from.as_ref().unwrap();
let mut tables = IndexSet::new();

tables.extend(from.dependencies());
if let Some(from) = &self.body.select.from {
tables.extend(from.dependencies());
}

// Collect tables referenced by subquery expressions in result columns
for col in &self.body.select.columns {
if let ResultColumn::Expr(pred) = col {
if let ast::Expr::Subquery(sub_select) = &pred.0 {
collect_subquery_tables(sub_select, &mut tables);
}
}
}

for compound in &self.body.compounds {
tables.extend(
Expand Down Expand Up @@ -188,6 +237,40 @@ pub enum SelectTable {
Select(Select),
}

/// Extract table names from an ast::Select (used for subquery dependency tracking).
fn collect_subquery_tables(select: &ast::Select, tables: &mut IndexSet<String>) {
if let ast::SelectBody {
select:
ast::OneSelect::Select {
from: Some(from_clause),
..
},
..
} = &select.body
{
collect_select_table_names(&from_clause.select, tables);
}
}

/// Extract table names from an ast::SelectTable.
fn collect_select_table_names(select_table: &ast::SelectTable, tables: &mut IndexSet<String>) {
match select_table {
ast::SelectTable::Table(qname, _, _) => {
// Reconstruct the full name including db prefix (e.g. "aux0.tablename")
// to match how the simulator tracks attached-database tables.
let name = match &qname.db_name {
Some(db) => format!("{}.{}", db.as_str(), qname.name.as_str()),
None => qname.name.as_str().to_owned(),
};
tables.insert(name);
}
ast::SelectTable::Select(sub, _) => {
collect_subquery_tables(sub, tables);
}
_ => {}
}
}

/// Convert a table name string to a QualifiedName, handling attached DB prefixes.
/// Names like "aux0.t1" become `QualifiedName::fullname("aux0", "t1")`,
/// while plain names like "t1" become `QualifiedName::single("t1")`.
Expand Down
Loading
Loading