[CONNECT][SDP][SPARK-56249] Create AUTO CDC syntax for SCD Type 1 in SQL#56419
[CONNECT][SDP][SPARK-56249] Create AUTO CDC syntax for SCD Type 1 in SQL#56419anew wants to merge 16 commits into
Conversation
szehon-ho
left a comment
There was a problem hiding this comment.
A few review notes on the new AUTO CDC SQL parsing — mostly around the source relation handling, converging on the existing ChangeArgs/AutoCdcFlow model used by the Python/Connect path, and some consistency/cleanliness nits. Details inline.
| params, "SEQUENCE BY", "AUTO CDC INTO") | ||
| } | ||
|
|
||
| val sourceTable = plan(params.source.relationPrimary) |
There was a problem hiding this comment.
plan(params.source.relationPrimary) only consumes the relationPrimary of source=relation. Since relation : LATERAL? relationPrimary relationExtension*, any join/pivot/unpivot/LATERAL on the source is parsed and then silently discarded. For example, both of these parse fine but use only source:
-- JOIN dropped; only `source` is read
CREATE FLOW f AS AUTO CDC INTO target
FROM source JOIN dim ON source.id = dim.id
KEYS (id) SEQUENCE BY ts
-- PIVOT dropped; only `source` is read
CREATE FLOW f AS AUTO CDC INTO target
FROM source PIVOT (sum(amt) FOR region IN ('US','EU'))
KEYS (id) SEQUENCE BY tsEither restrict the grammar to source=relationPrimary so the extra syntax is a parse error, or build from the full relation. Also note the Connect path models the source as a streaming relation (UnresolvedRelation(isStreaming = true)); here it's a plain relation with no streaming marker, so resolution semantics will diverge from the Python/Connect path.
There was a problem hiding this comment.
We only want the source itself. Let me fix this
| /** | ||
| * Command parsed from `CREATE STREAMING TABLE <name> FLOW AUTO CDC ...` SQL syntax. | ||
| * This command serves as a parse-time placeholder for a pipeline CDC definition and cannot be | ||
| * executed directly. It is interpreted by the pipeline submodule during a pipeline execution. |
There was a problem hiding this comment.
This says the node "is interpreted by the pipeline submodule during a pipeline execution," but there's currently no handler: SqlGraphRegistrationContext.processSqlQuery has no case for CreateStreamingTableAutoCdc (it hits the unsupportedLogicalPlan branch), and CreateFlowHandler only accepts InsertIntoStatement. So this SQL parses but always fails at registration today. Please soften to something like "will be interpreted once execution support is added" (and link the follow-up), or add the handling. Same wording applies to AutoCdcIntoCommand.
There was a problem hiding this comment.
OK, I thought this would be ok since I will follow up with that in the next PR.
| checkDuplicateClauses(params.autoCdcColumnsClause(), "COLUMNS", params) | ||
|
|
||
| if (params.autoCdcSequenceByClause().isEmpty) { | ||
| throw QueryParsingErrors.missingClausesForOperation( |
There was a problem hiding this comment.
The Connect path reports specific conditions for the same mistakes — AUTOCDC_MISSING_SEQUENCE_BY, AUTOCDC_MISSING_SOURCE, AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST, AUTOCDC_NON_COLUMN_IDENTIFIER (see PipelinesHandler.buildAutoCdcFlow). Here a missing SEQUENCE BY raises MISSING_CLAUSES_FOR_OPERATION instead, so the same error surfaces a different condition depending on whether the user came via SQL or Python. Could we reuse the AUTOCDC_* conditions (at minimum for missing sequence-by) for consistency across front-ends?
| APPLY: 'APPLY'; | ||
| APPROX: 'APPROX'; | ||
| ARCHIVE: 'ARCHIVE'; | ||
| AUTO: 'AUTO'; |
There was a problem hiding this comment.
AUTO is out of alphabetical order here (between ARCHIVE and ARRAY); it should sit after AUTHORIZATION. Same issue for SEQUENCE (between SCHEMA and SCHEMAS), and in nonReserved / the sql-ref-ansi-compliance.md table (where AUTO is listed before ATOMIC). The generated .sql.out golden files are already correctly sorted — just the hand-maintained lists are off. Minor, but worth fixing since you already have an ordering-fix commit.
There was a problem hiding this comment.
good catch will do. (it is amazing how many mistakes Claude makes when sorting :)
| // Error cases: duplicate clauses | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| test("duplicate SEQUENCE BY clause") { |
There was a problem hiding this comment.
These intercept[ParseException] cases (duplicate SEQUENCE BY/APPLY AS DELETE/COLUMNS, "standalone", APPLY AS TRUNCATE, etc.) don't assert the cause. E.g. the TRUNCATE case fails only because TRUNCATE isn't in the grammar, not because of a deliberate rejection — so it'd still "pass" if the parser broke for an unrelated reason. Suggest asserting getCondition (e.g. DUPLICATE_CLAUSES for the duplicate cases), like the missing-SEQUENCE BY test above already does.
There was a problem hiding this comment.
Added for the errors where we explicitly throw an exception.
For the cases that the parser already catches, it's hard to predict what the error code would be.
There was a problem hiding this comment.
OK, for those cases, I can just assert PARSE_SYNTAX_ERROR
There was a problem hiding this comment.
I added all assertions, but note that now we check syntax errors for syntaxes that do not exist yet :)
| assert(cmd.exceptCols.map(_.name) == Seq("op", "ts")) | ||
| } | ||
|
|
||
| test("CREATE STREAMING TABLE FLOW AUTO CDC - all clauses combined") { |
There was a problem hiding this comment.
visitCreateStreamingTableAutoCdc rejects MATERIALIZED VIEW, bucketing, options, serde, location, and column constraints via operationNotAllowed, but none of those branches are covered here. Please add at least the MV-rejection case (and ideally column-constraints), since they're easy to regress silently.
|
|
||
| val ifNotExists = headerCtx.EXISTS() != null | ||
| val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) | ||
| val (colDefs, colConstraints) = Option(ctx.tableElementList()).map(visitTableElementList) |
There was a problem hiding this comment.
This whole prelude — the colConstraints check, the visitCreateTableClauses destructure, the partitioning computation, the bucketSpec/options/serde/location guards, and the TableSpec(...) block — is duplicated almost verbatim from visitCreatePipelineDataset below. Suggest extracting a shared private helper (e.g. returning (colDefs, partitioning, spec, ifNotExists, tableIdent)) and calling it from both, so they can't drift.
Note the copy also quietly shortened the error messages (e.g. dropping the "Please remove any CHECK, UNIQUE, PK, and FK constraints..." guidance, and collapsing the STORED AS vs Hive-SerDe distinction). Factoring out the helper would keep these consistent with the existing CREATE STREAMING TABLE messages.
| |APPLY AS DELETE WHEN b = 2 | ||
| |SEQUENCE BY ts""".stripMargin) | ||
| } | ||
| assert(e.getCondition == "DUPLICATE_CLAUSES") |
There was a problem hiding this comment.
nit: Worth checking the specific clause being duplicated, it would be easy to end up with a bug pointing to the wrong one if we add more clauses in the future
There was a problem hiding this comment.
good point but with the latest change this cannot happen no more.
AnishMahto
left a comment
There was a problem hiding this comment.
Flushing out comments for API and data classes, which look mostly good to me! Will review parser logic/validations soon, but no need to block on me.
| "message" : [ | ||
| "Missing required clause(s) <clauses> for operation <operation>." | ||
| ], | ||
| "sqlState" : "42601" |
There was a problem hiding this comment.
To me this sounds like a semantic error not a syntactical error, maybe use 42613 or something else?
There was a problem hiding this comment.
this is now obsolete, with the fixed order it cannot happen any more
| | createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider? | ||
| createTableClauses | ||
| FLOW autoCdcBody #createStreamingTableAutoCdc |
There was a problem hiding this comment.
Did we consider collapsing createStreamingTableAutoCdc with createPipelineDataset using | for the query clause?
I.e is there a good reason for duplicating the shared createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider? createTableClauses.
| (autoCdcDeleteClause | ||
| | autoCdcSequenceByClause | ||
| | autoCdcColumnsClause | ||
| )* |
There was a problem hiding this comment.
Is * the right expression to use here? Implies there can be multiple sequence clauses for example, and pushes validation down to AST logic rather than being invalidated at a syntactical level.
| sequenceByExpr: Expression, | ||
| specifiedCols: Seq[UnresolvedAttribute], | ||
| exceptCols: Seq[UnresolvedAttribute] | ||
| ) extends LeafCommand { |
There was a problem hiding this comment.
I forget, does LeafCommand imply the command has no child logical plans? Any chance we should be using UnaryCommand given the sourceTable child?
| }, | ||
| "sqlState" : "0A000" | ||
| }, | ||
| "MISSING_CLAUSES_FOR_OPERATION" : { |
There was a problem hiding this comment.
Is this actually used anywhere?
There was a problem hiding this comment.
interestingly this is used by the parser for METRIC VIEWS now, so I can't remove it.
There was a problem hiding this comment.
Confirmed it should stay, though not for AUTO CDC: on current master QueryParsingErrors.missingClausesForOperation is referenced by the METRIC VIEW path (SparkSqlParser), but MISSING_CLAUSES_FOR_OPERATION is not actually defined in error-conditions.json on master -- so this entry is what backs that error. Probably cleaner to land it as its own small fix since it is orthogonal to AUTO CDC.
There was a problem hiding this comment.
I will send a separate PR for this
| |KEYS (id) | ||
| |SEQUENCE BY ts""".stripMargin) | ||
| } | ||
| assert(e.getMessage.contains("AUTO CDC is only supported for STREAMING TABLE")) |
There was a problem hiding this comment.
nit: here and other tests, use checkError and assert on expected error conditions/message parameters.
AnishMahto
left a comment
There was a problem hiding this comment.
Final comments, after taking a look at the AST builder logic!
| LogicalPlan, | ||
| Seq[UnresolvedAttribute], | ||
| Option[Expression], | ||
| Expression, | ||
| Seq[UnresolvedAttribute], | ||
| Seq[UnresolvedAttribute]) = |
There was a problem hiding this comment.
optional: Might be nice to put this in a case class before returning, and then AutoCdcIntoCommand can also just hold a reference to said case class. Or maybe even just directly return an AutoCdcIntoCommand from this function.
I generally don't like returning a tuple longer than 2-3 elements, its hard to keep track of position <-> semantic value mentally.
There was a problem hiding this comment.
+1, same reaction. A 6-element tuple destructured at two call sites is easy to misorder; a small AutoCdcParams case class (or returning AutoCdcIntoCommand directly) would be clearer.
| Seq[UnresolvedAttribute]) = | ||
| withOrigin(params) { | ||
| val sourceTable = plan(params.source) match { | ||
| case r: UnresolvedRelation => r.copy(isStreaming = true) |
There was a problem hiding this comment.
Instead of silently forcing the unresolved relation to be streaming, can we just throw if the user tries specifying a non-streaming relation? The error can guide users to use STREAM.
There was a problem hiding this comment.
+1. Silently coercing to isStreaming = true (and the case other => other passthrough) hides cases the contract does not actually allow. I'd throw a clear error for a non-streaming/non-identifier source and point users at STREAM(...), matching the Connect/Python path which models the source as a streaming relation.
There was a problem hiding this comment.
changed this to always require STREAM()
| withOrigin(params) { | ||
| val sourceTable = plan(params.source) match { | ||
| case r: UnresolvedRelation => r.copy(isStreaming = true) | ||
| case other => other |
There was a problem hiding this comment.
Should we invalidate against all other logical plans?
I don't know how this concretely maps against logical plan subclasses, but at the syntax level we should only be allowing source to be a multipart identifier, or STREAM(multipart identifier).
I think both of these will map directly to an UnresolvedRelation. And then can we take this a further where AutoCdcIntoCommand requires source to be a TableIdentifier/Identifier rather than an arbitrary LogicalPlan? Then I guess AutoCdcIntoCommand can go back to being a leaf plan type.
There was a problem hiding this comment.
Eventually in SDP we expect source to simply be a string identifier:
There was a problem hiding this comment.
Agree. Since source=relationPrimary, case other => other currently accepts subqueries, inline VALUES, TVFs, etc. as the source child and passes them through unmarked. Given SDP expects source to be a plain identifier (optional string source), I'd restrict this to a (optionally STREAM-wrapped) table identifier and reject the rest. That also lets AutoCdcIntoCommand carry a TableIdentifier and go back to a leaf, as you noted.
There was a problem hiding this comment.
Should I then change the syntax to only accept a tableidentifier?
| * | ||
| * @return (colDefs, partitioning, spec, ifNotExists, tableIdent) | ||
| */ | ||
| private def parsePipelineDatasetPrelude( |
There was a problem hiding this comment.
Do we still need this function now that the same createPipelineDataset rule is hit for both AS query and FLOW AUTO CDC sub-rules?
There was a problem hiding this comment.
+1 -- after the grammar merge removed visitCreateStreamingTableAutoCdc, this helper has a single caller (visitCreatePipelineDataset), and all of its params are just forwarded from that one ctx. The dedup rationale is gone, so I'd inline it back to avoid the extra 6-param/5-tuple indirection.
|
|
||
| autoCdcParameters | ||
| : FROM source=relationPrimary | ||
| KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN |
There was a problem hiding this comment.
For keys and the column selection clause, we don't actually allow multipart identifiers as part of the AutoCDC API contract.
Should we use identifier instead of multipartIdentifierList to reflect this? Either way SDP will invalidate multipart identifiers, but I figure we should make it true be construction where possible.
There was a problem hiding this comment.
+1 -- keys and the COLUMNS list only allow simple column identifiers per the contract, so multipartIdentifierList lets e.g. KEYS (a.b.c) parse and pushes the check downstream. Using identifierSeq/identifierList makes it correct by construction.
Add SQL syntax for AUTO CDC (Change Data Capture) with SCD Type 1 semantics. Two forms are supported: 1. CREATE FLOW <name> AS AUTO CDC INTO <target> FROM <source> KEYS (...) SEQUENCE BY <expr> 2. CREATE [OR REFRESH] STREAMING TABLE <name> FLOW AUTO CDC FROM <source> KEYS (...) SEQUENCE BY <expr> Optional clauses: APPLY AS DELETE WHEN, COLUMNS, COLUMNS * EXCEPT. Co-authored-by: Isaac
…oCdcParserSuite Co-authored-by: Isaac
…and ANSI compliance doc Add APPLY, AUTO, CDC, and SEQUENCE as non-reserved keywords in both ANSI and default modes, and document them in sql-ref-ansi-compliance.md. Co-authored-by: Isaac
… keyword lists Regenerate SQL golden files and add APPLY, AUTO, CDC, SEQUENCE to the hardcoded keyword lists in ThriftServer and SparkConnect tests. Co-authored-by: Isaac
…x from AUTO CDC The createPipelineDatasetHeader grammar rule inadvertently allowed CREATE OR REFRESH for streaming tables and materialized views. This was not intended for the AUTO CDC syntax. Remove the optional (OR REFRESH)? from the grammar and the corresponding orRefresh field from CreateStreamingTableAutoCdc. Co-authored-by: Isaac
…parsing - Restrict grammar source to relationPrimary to reject JOINs/PIVOTs - Mark source relation as streaming (isStreaming = true) for consistency with Connect/Python path - Use AUTOCDC_MISSING_SEQUENCE_BY and AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST error conditions for consistency across front-ends - Fix keyword ordering: AUTO after AUTHORIZATION, SEQUENCE after SEPARATED in lexer, parser rules, and ANSI compliance doc - Extract parsePipelineDatasetPrelude helper to deduplicate visitCreateStreamingTableAutoCdc and visitCreatePipelineDataset - Soften execution doc wording for CreateStreamingTableAutoCdc and AutoCdcIntoCommand - Add tests for MV rejection, column constraints, bucketing, options, serde, location, and assert error conditions on all error cases Co-authored-by: Isaac
…yntax Change the AUTO CDC grammar from a repeating alternation to a fixed order: APPLY AS DELETE (optional) → SEQUENCE BY (required) → COLUMNS (optional). Remove the now-unnecessary duplicate clause and mutual exclusion checks from the parser. Co-authored-by: Isaac
…eatePipelineDataset grammar rule Unify the two grammar alternatives for pipeline dataset creation into a single createPipelineDataset rule with an optional (AS query | FLOW autoCdcBody) tail. Remove the separate visitCreateStreamingTableAutoCdc visitor and handle all three streaming table forms (CTAS, bare, AutoCDC) in visitCreatePipelineDataset. Also change AutoCdcIntoCommand from LeafCommand to UnaryCommand since it has sourceTable as a child plan. Co-authored-by: Isaac
…r tests Replace plain assert checks with checkError for all error test cases, validating condition, sqlState, parameters, and queryContext. Co-authored-by: Isaac
…OLUMNS to simple identifiers Tighten the AUTO CDC grammar and AST building: - The AUTO CDC source now only accepts STREAM(multipartIdentifier); the bare multipart-identifier form is removed. The source is always returned as an UnresolvedRelation marked as a streaming read (isStreaming = true). - KEYS and COLUMNS now only accept simple identifiers (identifierSeq) instead of multipart identifiers, and are built as single-part UnresolvedAttributes. - Inline the pipeline-dataset prelude into visitCreatePipelineDataset and use the AutoCdcParams case class for parameter passing. Adds parser tests covering the STREAM requirement and rejection of multipart identifiers in KEYS and COLUMNS for both AUTO CDC variants. Co-authored-by: Isaac
Move the MISSING_CLAUSES_FOR_OPERATION error-condition definition out of this branch; it belongs to a separate change and will be submitted in its own PR. The QueryParsingErrors.missingClausesForOperation helper and its SparkSqlParser callers are already on master (from the METRIC VIEW work) and are unaffected. Co-authored-by: Isaac
Rename the AUTO CDC column fields specifiedCols/exceptCols to includeColumns/excludeColumns and change them from Seq to Option[Seq[UnresolvedAttribute]], so an absent COLUMNS clause (None) is distinguishable from an empty list. The rename and Option type are propagated through AutoCdcParams, AutoCdcIntoCommand, CreateStreamingTableAutoCdc, SparkSqlParser, and the parser tests. Co-authored-by: Isaac
Make the AUTO CDC source relation a child of the command nodes so the analyzer resolves it through the normal plan resolution path, mirroring MERGE INTO and the sibling CreatePipelineDatasetAsSelect: - AutoCdcIntoCommand becomes a UnaryCommand whose child is source (targetTable is a TableIdentifier, not a plan, so source is the only plan child). - CreateStreamingTableAutoCdc becomes a BinaryCommand with left = name and right = source, matching CreatePipelineDatasetAsSelect's left = name, right = query. Co-authored-by: Isaac
…reateFlowAutoCdc Rename the local variable holding the AutoCdcIntoCommand from applyChanges to autoCdcInto to match the command type. No behavior change. Co-authored-by: Isaac
…se classes Convert the positional AutoCdcIntoCommand and AutoCdcParams constructions in AstBuilder.parseAutoCdcParams/buildAutoCdcIntoCommand to named parameters. No behavior change. Co-authored-by: Isaac
What changes were proposed in this pull request?
Add SQL syntax for AUTO CDC (Change Data Capture) with SCD Type 1 semantics. Two forms are supported:
Optional clauses: APPLY AS DELETE WHEN, COLUMNS, COLUMNS * EXCEPT.
Why are the changes needed?
So far, there is only a Python API for Auto CDC; this adds a SQL API.
Does this PR introduce any user-facing change?
Yes, it adds a new SQL syntax
How was this patch tested?
[Run / Build modules: pyspark-connect-old-client]. It fails because it tests a client built on 4.0 to query the
server built from the current commit. It lists the first 20 sql keywords, and expects a 13-char wide output because
previously, AUTHORIZATION was among those 20 keywords. My addition of APPLY pushes AUTHORIZATION out
of the first 20, and the longest keyword is now ASENSITIVE with 10 characters. This test is quite brittle and has
been fixed on master by using
.columnsinstead of.show, but because the client is built from 4.0, this fix needsto be back-ported to 4.0, I cannot fix that in the current PR.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Opus 4.6