Skip to content

[CONNECT][SDP][SPARK-56249] Create AUTO CDC syntax for SCD Type 1 in SQL#56419

Open
anew wants to merge 16 commits into
apache:masterfrom
anew:scd1-sql-api
Open

[CONNECT][SDP][SPARK-56249] Create AUTO CDC syntax for SCD Type 1 in SQL#56419
anew wants to merge 16 commits into
apache:masterfrom
anew:scd1-sql-api

Conversation

@anew

@anew anew commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add SQL syntax for AUTO CDC (Change Data Capture) with SCD Type 1 semantics. Two forms are supported:

  1. CREATE FLOW AS AUTO CDC INTO FROM KEYS (...) SEQUENCE BY
  2. CREATE [OR REFRESH] STREAMING TABLE FLOW AUTO CDC FROM KEYS (...) SEQUENCE BY

Optional clauses: APPLY AS DELETE WHEN, COLUMNS, COLUMNS * EXCEPT.

Why are the changes needed?

So far, there is only a Python API for Auto CDC; this adds a SQL API.

Does this PR introduce any user-facing change?

Yes, it adds a new SQL syntax

How was this patch tested?

  • New tests are added: AutoCdcParserSuite
  • some tests are failing in CI. They are all unrelated to my changes and also broken in master, except for
    [Run / Build modules: pyspark-connect-old-client]. It fails because it tests a client built on 4.0 to query the
    server built from the current commit. It lists the first 20 sql keywords, and expects a 13-char wide output because
    previously, AUTHORIZATION was among those 20 keywords. My addition of APPLY pushes AUTHORIZATION out
    of the first 20, and the longest keyword is now ASENSITIVE with 10 characters. This test is quite brittle and has
    been fixed on master by using .columns instead of .show, but because the client is built from 4.0, this fix needs
    to be back-ported to 4.0, I cannot fix that in the current PR.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Opus 4.6

@szehon-ho szehon-ho left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few review notes on the new AUTO CDC SQL parsing — mostly around the source relation handling, converging on the existing ChangeArgs/AutoCdcFlow model used by the Python/Connect path, and some consistency/cleanliness nits. Details inline.

params, "SEQUENCE BY", "AUTO CDC INTO")
}

val sourceTable = plan(params.source.relationPrimary)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan(params.source.relationPrimary) only consumes the relationPrimary of source=relation. Since relation : LATERAL? relationPrimary relationExtension*, any join/pivot/unpivot/LATERAL on the source is parsed and then silently discarded. For example, both of these parse fine but use only source:

-- JOIN dropped; only `source` is read
CREATE FLOW f AS AUTO CDC INTO target
FROM source JOIN dim ON source.id = dim.id
KEYS (id) SEQUENCE BY ts

-- PIVOT dropped; only `source` is read
CREATE FLOW f AS AUTO CDC INTO target
FROM source PIVOT (sum(amt) FOR region IN ('US','EU'))
KEYS (id) SEQUENCE BY ts

Either restrict the grammar to source=relationPrimary so the extra syntax is a parse error, or build from the full relation. Also note the Connect path models the source as a streaming relation (UnresolvedRelation(isStreaming = true)); here it's a plain relation with no streaming marker, so resolution semantics will diverge from the Python/Connect path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only want the source itself. Let me fix this

/**
* Command parsed from `CREATE STREAMING TABLE <name> FLOW AUTO CDC ...` SQL syntax.
* This command serves as a parse-time placeholder for a pipeline CDC definition and cannot be
* executed directly. It is interpreted by the pipeline submodule during a pipeline execution.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This says the node "is interpreted by the pipeline submodule during a pipeline execution," but there's currently no handler: SqlGraphRegistrationContext.processSqlQuery has no case for CreateStreamingTableAutoCdc (it hits the unsupportedLogicalPlan branch), and CreateFlowHandler only accepts InsertIntoStatement. So this SQL parses but always fails at registration today. Please soften to something like "will be interpreted once execution support is added" (and link the follow-up), or add the handling. Same wording applies to AutoCdcIntoCommand.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I thought this would be ok since I will follow up with that in the next PR.

checkDuplicateClauses(params.autoCdcColumnsClause(), "COLUMNS", params)

if (params.autoCdcSequenceByClause().isEmpty) {
throw QueryParsingErrors.missingClausesForOperation(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Connect path reports specific conditions for the same mistakes — AUTOCDC_MISSING_SEQUENCE_BY, AUTOCDC_MISSING_SOURCE, AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST, AUTOCDC_NON_COLUMN_IDENTIFIER (see PipelinesHandler.buildAutoCdcFlow). Here a missing SEQUENCE BY raises MISSING_CLAUSES_FOR_OPERATION instead, so the same error surfaces a different condition depending on whether the user came via SQL or Python. Could we reuse the AUTOCDC_* conditions (at minimum for missing sequence-by) for consistency across front-ends?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good suggestion

APPLY: 'APPLY';
APPROX: 'APPROX';
ARCHIVE: 'ARCHIVE';
AUTO: 'AUTO';

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AUTO is out of alphabetical order here (between ARCHIVE and ARRAY); it should sit after AUTHORIZATION. Same issue for SEQUENCE (between SCHEMA and SCHEMAS), and in nonReserved / the sql-ref-ansi-compliance.md table (where AUTO is listed before ATOMIC). The generated .sql.out golden files are already correctly sorted — just the hand-maintained lists are off. Minor, but worth fixing since you already have an ordering-fix commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch will do. (it is amazing how many mistakes Claude makes when sorting :)

// Error cases: duplicate clauses
// ---------------------------------------------------------------------------

test("duplicate SEQUENCE BY clause") {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These intercept[ParseException] cases (duplicate SEQUENCE BY/APPLY AS DELETE/COLUMNS, "standalone", APPLY AS TRUNCATE, etc.) don't assert the cause. E.g. the TRUNCATE case fails only because TRUNCATE isn't in the grammar, not because of a deliberate rejection — so it'd still "pass" if the parser broke for an unrelated reason. Suggest asserting getCondition (e.g. DUPLICATE_CLAUSES for the duplicate cases), like the missing-SEQUENCE BY test above already does.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for the errors where we explicitly throw an exception.
For the cases that the parser already catches, it's hard to predict what the error code would be.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, for those cases, I can just assert PARSE_SYNTAX_ERROR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added all assertions, but note that now we check syntax errors for syntaxes that do not exist yet :)

assert(cmd.exceptCols.map(_.name) == Seq("op", "ts"))
}

test("CREATE STREAMING TABLE FLOW AUTO CDC - all clauses combined") {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visitCreateStreamingTableAutoCdc rejects MATERIALIZED VIEW, bucketing, options, serde, location, and column constraints via operationNotAllowed, but none of those branches are covered here. Please add at least the MV-rejection case (and ideally column-constraints), since they're easy to regress silently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


val ifNotExists = headerCtx.EXISTS() != null
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (colDefs, colConstraints) = Option(ctx.tableElementList()).map(visitTableElementList)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole prelude — the colConstraints check, the visitCreateTableClauses destructure, the partitioning computation, the bucketSpec/options/serde/location guards, and the TableSpec(...) block — is duplicated almost verbatim from visitCreatePipelineDataset below. Suggest extracting a shared private helper (e.g. returning (colDefs, partitioning, spec, ifNotExists, tableIdent)) and calling it from both, so they can't drift.

Note the copy also quietly shortened the error messages (e.g. dropping the "Please remove any CHECK, UNIQUE, PK, and FK constraints..." guidance, and collapsing the STORED AS vs Hive-SerDe distinction). Factoring out the helper would keep these consistent with the existing CREATE STREAMING TABLE messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

@anew anew requested a review from szehon-ho June 12, 2026 01:58
|APPLY AS DELETE WHEN b = 2
|SEQUENCE BY ts""".stripMargin)
}
assert(e.getCondition == "DUPLICATE_CLAUSES")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Worth checking the specific clause being duplicated, it would be easy to end up with a bug pointing to the wrong one if we add more clauses in the future

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point but with the latest change this cannot happen no more.

@AnishMahto AnishMahto left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing out comments for API and data classes, which look mostly good to me! Will review parser logic/validations soon, but no need to block on me.

"message" : [
"Missing required clause(s) <clauses> for operation <operation>."
],
"sqlState" : "42601"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this sounds like a semantic error not a syntactical error, maybe use 42613 or something else?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now obsolete, with the fixed order it cannot happen any more

Comment on lines +420 to +422
| createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider?
createTableClauses
FLOW autoCdcBody #createStreamingTableAutoCdc

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we consider collapsing createStreamingTableAutoCdc with createPipelineDataset using | for the query clause?

I.e is there a good reason for duplicating the shared createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider? createTableClauses.

Comment on lines +769 to +772
(autoCdcDeleteClause
| autoCdcSequenceByClause
| autoCdcColumnsClause
)*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is * the right expression to use here? Implies there can be multiple sequence clauses for example, and pushes validation down to AST logic rather than being invalidated at a syntactical level.

sequenceByExpr: Expression,
specifiedCols: Seq[UnresolvedAttribute],
exceptCols: Seq[UnresolvedAttribute]
) extends LeafCommand {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, does LeafCommand imply the command has no child logical plans? Any chance we should be using UnaryCommand given the sourceTable child?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably true.

},
"sqlState" : "0A000"
},
"MISSING_CLAUSES_FOR_OPERATION" : {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually used anywhere?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interestingly this is used by the parser for METRIC VIEWS now, so I can't remove it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed it should stay, though not for AUTO CDC: on current master QueryParsingErrors.missingClausesForOperation is referenced by the METRIC VIEW path (SparkSqlParser), but MISSING_CLAUSES_FOR_OPERATION is not actually defined in error-conditions.json on master -- so this entry is what backs that error. Probably cleaner to land it as its own small fix since it is orthogonal to AUTO CDC.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will send a separate PR for this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in this PR: #56827

|KEYS (id)
|SEQUENCE BY ts""".stripMargin)
}
assert(e.getMessage.contains("AUTO CDC is only supported for STREAMING TABLE"))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here and other tests, use checkError and assert on expected error conditions/message parameters.

@AnishMahto AnishMahto left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final comments, after taking a look at the AST builder logic!

Comment on lines +1374 to +1379
LogicalPlan,
Seq[UnresolvedAttribute],
Option[Expression],
Expression,
Seq[UnresolvedAttribute],
Seq[UnresolvedAttribute]) =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: Might be nice to put this in a case class before returning, and then AutoCdcIntoCommand can also just hold a reference to said case class. Or maybe even just directly return an AutoCdcIntoCommand from this function.

I generally don't like returning a tuple longer than 2-3 elements, its hard to keep track of position <-> semantic value mentally.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, same reaction. A 6-element tuple destructured at two call sites is easy to misorder; a small AutoCdcParams case class (or returning AutoCdcIntoCommand directly) would be clearer.

Seq[UnresolvedAttribute]) =
withOrigin(params) {
val sourceTable = plan(params.source) match {
case r: UnresolvedRelation => r.copy(isStreaming = true)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of silently forcing the unresolved relation to be streaming, can we just throw if the user tries specifying a non-streaming relation? The error can guide users to use STREAM.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Silently coercing to isStreaming = true (and the case other => other passthrough) hides cases the contract does not actually allow. I'd throw a clear error for a non-streaming/non-identifier source and point users at STREAM(...), matching the Connect/Python path which models the source as a streaming relation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this to always require STREAM()

withOrigin(params) {
val sourceTable = plan(params.source) match {
case r: UnresolvedRelation => r.copy(isStreaming = true)
case other => other

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we invalidate against all other logical plans?

I don't know how this concretely maps against logical plan subclasses, but at the syntax level we should only be allowing source to be a multipart identifier, or STREAM(multipart identifier).

I think both of these will map directly to an UnresolvedRelation. And then can we take this a further where AutoCdcIntoCommand requires source to be a TableIdentifier/Identifier rather than an arbitrary LogicalPlan? Then I guess AutoCdcIntoCommand can go back to being a leaf plan type.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually in SDP we expect source to simply be a string identifier:

.parseTableIdentifier(name = autoCdcDetails.getSource, spark = sessionHolder.session)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Since source=relationPrimary, case other => other currently accepts subqueries, inline VALUES, TVFs, etc. as the source child and passes them through unmarked. Given SDP expects source to be a plain identifier (optional string source), I'd restrict this to a (optionally STREAM-wrapped) table identifier and reject the rest. That also lets AutoCdcIntoCommand carry a TableIdentifier and go back to a leaf, as you noted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I then change the syntax to only accept a tableidentifier?

*
* @return (colDefs, partitioning, spec, ifNotExists, tableIdent)
*/
private def parsePipelineDatasetPrelude(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this function now that the same createPipelineDataset rule is hit for both AS query and FLOW AUTO CDC sub-rules?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 -- after the grammar merge removed visitCreateStreamingTableAutoCdc, this helper has a single caller (visitCreatePipelineDataset), and all of its params are just forwarded from that one ctx. The dedup rationale is gone, so I'd inline it back to avoid the extra 6-param/5-tuple indirection.


autoCdcParameters
: FROM source=relationPrimary
KEYS LEFT_PAREN keys=multipartIdentifierList RIGHT_PAREN

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For keys and the column selection clause, we don't actually allow multipart identifiers as part of the AutoCDC API contract.

Should we use identifier instead of multipartIdentifierList to reflect this? Either way SDP will invalidate multipart identifiers, but I figure we should make it true be construction where possible.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 -- keys and the COLUMNS list only allow simple column identifiers per the contract, so multipartIdentifierList lets e.g. KEYS (a.b.c) parse and pushes the check downstream. Using identifierSeq/identifierList makes it correct by construction.

anew added 16 commits June 26, 2026 17:27
Add SQL syntax for AUTO CDC (Change Data Capture) with SCD Type 1 semantics.
Two forms are supported:

1. CREATE FLOW <name> AS AUTO CDC INTO <target> FROM <source> KEYS (...) SEQUENCE BY <expr>
2. CREATE [OR REFRESH] STREAMING TABLE <name> FLOW AUTO CDC FROM <source> KEYS (...) SEQUENCE BY <expr>

Optional clauses: APPLY AS DELETE WHEN, COLUMNS, COLUMNS * EXCEPT.

Co-authored-by: Isaac
…and ANSI compliance doc

Add APPLY, AUTO, CDC, and SEQUENCE as non-reserved keywords in both
ANSI and default modes, and document them in sql-ref-ansi-compliance.md.

Co-authored-by: Isaac
… keyword lists

Regenerate SQL golden files and add APPLY, AUTO, CDC, SEQUENCE to
the hardcoded keyword lists in ThriftServer and SparkConnect tests.

Co-authored-by: Isaac
…x from AUTO CDC

The createPipelineDatasetHeader grammar rule inadvertently allowed
CREATE OR REFRESH for streaming tables and materialized views. This
was not intended for the AUTO CDC syntax. Remove the optional
(OR REFRESH)? from the grammar and the corresponding orRefresh field
from CreateStreamingTableAutoCdc.

Co-authored-by: Isaac
…parsing

- Restrict grammar source to relationPrimary to reject JOINs/PIVOTs
- Mark source relation as streaming (isStreaming = true) for consistency
  with Connect/Python path
- Use AUTOCDC_MISSING_SEQUENCE_BY and AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST
  error conditions for consistency across front-ends
- Fix keyword ordering: AUTO after AUTHORIZATION, SEQUENCE after SEPARATED
  in lexer, parser rules, and ANSI compliance doc
- Extract parsePipelineDatasetPrelude helper to deduplicate
  visitCreateStreamingTableAutoCdc and visitCreatePipelineDataset
- Soften execution doc wording for CreateStreamingTableAutoCdc and
  AutoCdcIntoCommand
- Add tests for MV rejection, column constraints, bucketing, options,
  serde, location, and assert error conditions on all error cases

Co-authored-by: Isaac
…yntax

Change the AUTO CDC grammar from a repeating alternation to a fixed
order: APPLY AS DELETE (optional) → SEQUENCE BY (required) → COLUMNS
(optional). Remove the now-unnecessary duplicate clause and mutual
exclusion checks from the parser.

Co-authored-by: Isaac
…eatePipelineDataset grammar rule

Unify the two grammar alternatives for pipeline dataset creation into a
single createPipelineDataset rule with an optional (AS query | FLOW
autoCdcBody) tail. Remove the separate visitCreateStreamingTableAutoCdc
visitor and handle all three streaming table forms (CTAS, bare, AutoCDC)
in visitCreatePipelineDataset.

Also change AutoCdcIntoCommand from LeafCommand to UnaryCommand since it
has sourceTable as a child plan.

Co-authored-by: Isaac
…r tests

Replace plain assert checks with checkError for all error test cases,
validating condition, sqlState, parameters, and queryContext.

Co-authored-by: Isaac
…OLUMNS to simple identifiers

Tighten the AUTO CDC grammar and AST building:

- The AUTO CDC source now only accepts STREAM(multipartIdentifier); the bare
  multipart-identifier form is removed. The source is always returned as an
  UnresolvedRelation marked as a streaming read (isStreaming = true).
- KEYS and COLUMNS now only accept simple identifiers (identifierSeq) instead of
  multipart identifiers, and are built as single-part UnresolvedAttributes.
- Inline the pipeline-dataset prelude into visitCreatePipelineDataset and use the
  AutoCdcParams case class for parameter passing.

Adds parser tests covering the STREAM requirement and rejection of multipart
identifiers in KEYS and COLUMNS for both AUTO CDC variants.

Co-authored-by: Isaac
Move the MISSING_CLAUSES_FOR_OPERATION error-condition definition out of
this branch; it belongs to a separate change and will be submitted in its
own PR. The QueryParsingErrors.missingClausesForOperation helper and its
SparkSqlParser callers are already on master (from the METRIC VIEW work)
and are unaffected.

Co-authored-by: Isaac
Rename the AUTO CDC column fields specifiedCols/exceptCols to
includeColumns/excludeColumns and change them from Seq to
Option[Seq[UnresolvedAttribute]], so an absent COLUMNS clause (None) is
distinguishable from an empty list. The rename and Option type are
propagated through AutoCdcParams, AutoCdcIntoCommand,
CreateStreamingTableAutoCdc, SparkSqlParser, and the parser tests.

Co-authored-by: Isaac
Make the AUTO CDC source relation a child of the command nodes so the
analyzer resolves it through the normal plan resolution path, mirroring
MERGE INTO and the sibling CreatePipelineDatasetAsSelect:

- AutoCdcIntoCommand becomes a UnaryCommand whose child is source
  (targetTable is a TableIdentifier, not a plan, so source is the only
  plan child).
- CreateStreamingTableAutoCdc becomes a BinaryCommand with left = name
  and right = source, matching CreatePipelineDatasetAsSelect's
  left = name, right = query.

Co-authored-by: Isaac
…reateFlowAutoCdc

Rename the local variable holding the AutoCdcIntoCommand from applyChanges
to autoCdcInto to match the command type. No behavior change.

Co-authored-by: Isaac
…se classes

Convert the positional AutoCdcIntoCommand and AutoCdcParams constructions
in AstBuilder.parseAutoCdcParams/buildAutoCdcIntoCommand to named
parameters. No behavior change.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants