diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index d28f0af5dd0af..c3eec6cc5ce3e 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -423,6 +423,7 @@ Below is a list of all the keywords in Spark SQL. |ANTI|non-reserved|strict-non-reserved|non-reserved| |ANY|reserved|non-reserved|reserved| |ANY_VALUE|non-reserved|non-reserved|non-reserved| +|APPLY|non-reserved|non-reserved|non-reserved| |APPROX|non-reserved|non-reserved|non-reserved| |ARCHIVE|non-reserved|non-reserved|non-reserved| |ARRAY|non-reserved|non-reserved|reserved| @@ -432,6 +433,7 @@ Below is a list of all the keywords in Spark SQL. |AT|non-reserved|non-reserved|reserved| |ATOMIC|non-reserved|non-reserved|non-reserved| |AUTHORIZATION|reserved|non-reserved|reserved| +|AUTO|non-reserved|non-reserved|non-reserved| |BEGIN|non-reserved|non-reserved|non-reserved| |BERNOULLI|non-reserved|non-reserved|non-reserved| |BETWEEN|non-reserved|non-reserved|reserved| @@ -456,6 +458,7 @@ Below is a list of all the keywords in Spark SQL. |CAST|reserved|non-reserved|reserved| |CATALOG|non-reserved|non-reserved|non-reserved| |CATALOGS|non-reserved|non-reserved|non-reserved| +|CDC|non-reserved|non-reserved|non-reserved| |CHANGE|non-reserved|non-reserved|non-reserved| |CHANGES|non-reserved|non-reserved|non-reserved| |CHAR|non-reserved|non-reserved|reserved| @@ -744,6 +747,7 @@ Below is a list of all the keywords in Spark SQL. |SELECT|reserved|non-reserved|reserved| |SEMI|non-reserved|strict-non-reserved|non-reserved| |SEPARATED|non-reserved|non-reserved|non-reserved| +|SEQUENCE|non-reserved|non-reserved|non-reserved| |SERDE|non-reserved|non-reserved|non-reserved| |SERDEPROPERTIES|non-reserved|non-reserved|non-reserved| |SESSION_USER|reserved|non-reserved|reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 0e940ee5b4b01..5b97f32dbfcfc 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -140,6 +140,7 @@ AND: 'AND'; ANTI: 'ANTI'; ANY: 'ANY'; ANY_VALUE: 'ANY_VALUE'; +APPLY: 'APPLY'; APPROX: 'APPROX'; ARCHIVE: 'ARCHIVE'; ARRAY: 'ARRAY' {incComplexTypeLevelCounter();}; @@ -149,6 +150,7 @@ ASENSITIVE: 'ASENSITIVE'; AT: 'AT'; ATOMIC: 'ATOMIC'; AUTHORIZATION: 'AUTHORIZATION'; +AUTO: 'AUTO'; BEGIN: 'BEGIN'; BERNOULLI: 'BERNOULLI'; BETWEEN: 'BETWEEN'; @@ -173,6 +175,7 @@ CASE: 'CASE'; CAST: 'CAST'; CATALOG: 'CATALOG'; CATALOGS: 'CATALOGS'; +CDC: 'CDC'; CHANGE: 'CHANGE'; CHANGES: 'CHANGES'; CHAR: 'CHAR'; @@ -459,6 +462,7 @@ SECURITY: 'SECURITY'; SELECT: 'SELECT'; SEMI: 'SEMI'; SEPARATED: 'SEPARATED'; +SEQUENCE: 'SEQUENCE'; SERDE: 'SERDE'; SERDEPROPERTIES: 'SERDEPROPERTIES'; SESSION_USER: 'SESSION_USER'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 2466cf62272af..382907f718791 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -416,8 +416,9 @@ statement | unsupportedHiveNativeCommands .*? #failNativeCommand | createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider? createTableClauses - (AS query)? #createPipelineDataset + (AS query | FLOW autoCdcBody)? #createPipelineDataset | createPipelineFlowHeader insertInto query #createPipelineInsertIntoFlow + | createPipelineFlowHeader autoCdcCommand #createFlowAutoCdc ; materializedView @@ -750,6 +751,41 @@ dmlStatementNoWith notMatchedBySourceClause* #mergeIntoTable ; +autoCdcCommand + : AUTO CDC INTO target=multipartIdentifier + autoCdcParameters + ; + +autoCdcBody + : AUTO CDC autoCdcParameters + ; + +autoCdcParameters + : FROM source=autoCdcSource + KEYS LEFT_PAREN keys=identifierSeq RIGHT_PAREN + autoCdcDeleteClause? + autoCdcSequenceByClause + autoCdcColumnsClause? + ; + +autoCdcSource + : STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN + ; + +autoCdcDeleteClause + : APPLY AS DELETE WHEN deleteCondition=booleanExpression + ; + +autoCdcSequenceByClause + : SEQUENCE BY sequence=expression + ; + +autoCdcColumnsClause + : COLUMNS ( + columns=identifierSeq | + ASTERISK EXCEPT LEFT_PAREN exceptCols=identifierSeq RIGHT_PAREN) + ; + identifierReference : IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN | multipartIdentifier @@ -1959,6 +1995,7 @@ ansiNonReserved | ANALYZE | ANTI | ANY_VALUE + | APPLY | APPROX | ARCHIVE | ARRAY @@ -1966,6 +2003,7 @@ ansiNonReserved | ASENSITIVE | AT | ATOMIC + | AUTO | BEGIN | BERNOULLI | BETWEEN @@ -1987,6 +2025,7 @@ ansiNonReserved | CASCADE | CATALOG | CATALOGS + | CDC | CHANGE | CHANGES | CHAR @@ -2218,6 +2257,7 @@ ansiNonReserved | SECURITY | SEMI | SEPARATED + | SEQUENCE | SERDE | SERDEPROPERTIES | SET @@ -2346,6 +2386,7 @@ nonReserved | AND | ANY | ANY_VALUE + | APPLY | APPROX | ARCHIVE | ARRAY @@ -2355,6 +2396,7 @@ nonReserved | AT | ATOMIC | AUTHORIZATION + | AUTO | BEGIN | BERNOULLI | BETWEEN @@ -2380,6 +2422,7 @@ nonReserved | CAST | CATALOG | CATALOGS + | CDC | CHANGE | CHANGES | CHAR @@ -2655,6 +2698,7 @@ nonReserved | SECURITY | SELECT | SEPARATED + | SEQUENCE | SERDE | SERDEPROPERTIES | SESSION_USER diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index fe4841042a392..56da6ff1e49ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1362,6 +1362,58 @@ class AstBuilder extends DataTypeAstBuilder withSchemaEvolution) } + protected def buildAutoCdcIntoCommand(ctx: AutoCdcCommandContext): AutoCdcIntoCommand = + withOrigin(ctx) { + val target = visitMultipartIdentifier(ctx.target).asTableIdentifier + val params = parseAutoCdcParams(ctx.autoCdcParameters()) + AutoCdcIntoCommand( + targetTable = target, + source = params.source, + keys = params.keys, + deleteCondition = params.deleteCondition, + sequenceByExpr = params.sequencing, + includeColumns = params.includeColumns, + excludeColumns = params.excludeColumns) + } + + protected def parseAutoCdcParams(params: AutoCdcParametersContext): AutoCdcParams = + withOrigin(params) { + val source = resolveAutoCdcSource(params.source) + val keys = visitIdentifierSeq(params.keys).map(UnresolvedAttribute.quoted) + val deleteCondition = Option(params.autoCdcDeleteClause()) + .map(c => expression(c.deleteCondition)) + val sequencing = expression(params.autoCdcSequenceByClause().sequence) + + val columnsClause = Option(params.autoCdcColumnsClause()) + val includeColumns = columnsClause.collect { + case c if c.columns != null => + visitIdentifierSeq(c.columns).map(UnresolvedAttribute.quoted) + } + val excludeColumns = columnsClause.collect { + case c if c.exceptCols != null => + visitIdentifierSeq(c.exceptCols).map(UnresolvedAttribute.quoted) + } + + AutoCdcParams( + source = source, + keys = keys, + deleteCondition = deleteCondition, + sequencing = sequencing, + includeColumns = includeColumns, + excludeColumns = excludeColumns) + } + + /** + * Resolve the AUTO CDC source, which is a STREAM(multipartIdentifier). It is returned as an + * [[UnresolvedRelation]] marked as a streaming read via the `isStreaming` flag. + */ + protected def resolveAutoCdcSource(ctx: AutoCdcSourceContext): UnresolvedRelation = + withOrigin(ctx) { + val ident = visitMultipartIdentifier(ctx.multipartIdentifier) + createUnresolvedRelation( + ctx, ident, optionsClause = None, writePrivileges = Set.empty, isStreaming = true) + } + /** * Returns the parameters for [[UnresolvedExecuteImmediate]] logical plan. * Expected format: @@ -7710,3 +7762,14 @@ class AstBuilder extends DataTypeAstBuilder } } } + +/** + * Parameters parsed from an AUTO CDC clause. + */ +case class AutoCdcParams( + source: LogicalPlan, + keys: Seq[UnresolvedAttribute], + deleteCondition: Option[Expression], + sequencing: Expression, + includeColumns: Option[Seq[UnresolvedAttribute]], + excludeColumns: Option[Seq[UnresolvedAttribute]]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AutoCdcIntoCommand.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AutoCdcIntoCommand.scala new file mode 100644 index 0000000000000..54c33825c0fd9 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AutoCdcIntoCommand.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.Expression + +/** + * Logical plan node for an AUTO CDC INTO command, used by Spark Declarative Pipelines. + * + * This represents a CDC (Change Data Capture) operation that applies an ordered change event + * stream from [[source]] into [[targetTable]] using SCD Type 1 (upsert) semantics. + * + * This node serves as a parse-time placeholder for a pipeline CDC definition and cannot be + * executed directly. It will be interpreted by the pipeline submodule once execution support + * is added (SPARK-57402). The [[source]] relation is exposed as the node's child so the analyzer + * resolves it through the normal plan resolution path. + * + * @param targetTable The target table to apply changes into. + * @param source The source relation providing the change events. Always a STREAM(...) + * source (marked as a streaming read). Exposed as the node's child. + * @param keys Column(s) that uniquely identify a row in the target table. + * @param deleteCondition An optional expression that marks a source row as a DELETE operation. + * When absent, all source rows are treated as upserts. + * @param sequenceByExpr Expression that orders CDC events to correctly resolve out-of-order + * arrivals. Must evaluate to a sortable type. Required. + * @param includeColumns An explicit list of source columns to include in the target table. + * [[None]] when no COLUMNS clause was specified. Mutually exclusive with + * [[excludeColumns]]. + * @param excludeColumns Source columns to exclude from the target table (i.e., all columns + * except these). [[None]] when no COLUMNS clause was specified. Mutually + * exclusive with [[includeColumns]]. + */ +case class AutoCdcIntoCommand( + targetTable: TableIdentifier, + source: LogicalPlan, + keys: Seq[UnresolvedAttribute], + deleteCondition: Option[Expression], + sequenceByExpr: Expression, + includeColumns: Option[Seq[UnresolvedAttribute]], + excludeColumns: Option[Seq[UnresolvedAttribute]] +) extends UnaryCommand { + override def child: LogicalPlan = source + + override protected def withNewChildInternal(newChild: LogicalPlan): AutoCdcIntoCommand = + copy(source = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 40cf5009b97dc..b11890131f359 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -824,6 +824,55 @@ case class CreateStreamingTable( copy(name = newChild) } +/** + * Command parsed from `CREATE STREAMING TABLE FLOW AUTO CDC ...` SQL syntax. + * This command serves as a parse-time placeholder for a pipeline CDC definition and cannot be + * executed directly. It will be interpreted by the pipeline submodule once execution support + * is added (SPARK-57402). + * + * The target of the CDC operation is the streaming table itself (given by [[name]]). + * + * [[name]] and [[source]] are exposed as the node's children (left and right respectively) so the + * analyzer resolves them through the normal plan resolution path, mirroring how + * [[CreatePipelineDatasetAsSelect]] exposes its name and query. + * + * @param name The streaming table name, which also serves as the CDC target. Exposed as + * the node's left child. + * @param columns User-specified columns for the streaming table. + * @param partitioning Column-based partitioning for the streaming table. + * @param tableSpec Additional table specs. + * @param ifNotExists Whether the table should only be created if it doesn't already exist. + * @param source The source relation providing the change events. Always a STREAM(...) + * source (marked as a streaming read). Exposed as the node's right child. + * @param keys Column(s) that uniquely identify a row in the target table. + * @param deleteCondition An optional expression that marks a source row as a DELETE operation. + * @param sequenceByExpr Expression that orders CDC events to resolve out-of-order arrivals. + * @param includeColumns An explicit list of source columns to include. [[None]] when no COLUMNS + * clause was specified. Mutually exclusive with [[excludeColumns]]. + * @param excludeColumns Source columns to exclude. [[None]] when no COLUMNS clause was specified. + * Mutually exclusive with [[includeColumns]]. + */ +case class CreateStreamingTableAutoCdc( + name: LogicalPlan, + columns: Seq[ColumnDefinition], + partitioning: Seq[Transform], + tableSpec: TableSpecBase, + ifNotExists: Boolean, + source: LogicalPlan, + keys: Seq[UnresolvedAttribute], + deleteCondition: Option[Expression], + sequenceByExpr: Expression, + includeColumns: Option[Seq[UnresolvedAttribute]], + excludeColumns: Option[Seq[UnresolvedAttribute]] +) extends BinaryCommand with CreatePipelineDataset { + override def left: LogicalPlan = name + override def right: LogicalPlan = source + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): CreateStreamingTableAutoCdc = + copy(name = newLeft, source = newRight) +} + /** * Replace a table with a v2 catalog. * diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala index 49c0dccc780c5..61424bd9a1955 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala @@ -210,7 +210,7 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark val metadata = conn.getMetaData // scalastyle:off line.size.limit // CURRENT_PATH and SYSTEM are excluded: getSQLKeywords drops SQL:2003 reserved words (see companion). - assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALIGN,ALWAYS,ANALYZE,ANTI,ANY_VALUE,APPROX,ARCHIVE,ASC,BERNOULLI,BIN,BINDING,BIN_DISTRIBUTE_RATIO,BIN_END,BIN_START,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,CURRENT_DATABASE,CURRENT_SCHEMA,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXACT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NEAREST,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNIFORM,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,WIDTH,X,YEARS,ZONE") + assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALIGN,ALWAYS,ANALYZE,ANTI,ANY_VALUE,APPLY,APPROX,ARCHIVE,ASC,AUTO,BERNOULLI,BIN,BINDING,BIN_DISTRIBUTE_RATIO,BIN_END,BIN_START,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CDC,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,CURRENT_DATABASE,CURRENT_SCHEMA,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXACT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NEAREST,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SEQUENCE,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNIFORM,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,WIDTH,X,YEARS,ZONE") // scalastyle:on line.size.limit } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8ff2161a2965d..2eb2c7e914413 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1571,6 +1571,19 @@ class SparkSqlAstBuilder extends AstBuilder { ) } + override def visitCreateFlowAutoCdc( + ctx: CreateFlowAutoCdcContext): LogicalPlan = withOrigin(ctx) { + val flowHeaderCtx = ctx.createPipelineFlowHeader() + val ident = withIdentClause(flowHeaderCtx.flowName, UnresolvedIdentifier(_)) + val commentOpt = Option(flowHeaderCtx.commentSpec()).map(visitCommentSpec) + val autoCdcInto = buildAutoCdcIntoCommand(ctx.autoCdcCommand()) + CreateFlowCommand( + name = ident, + flowOperation = autoCdcInto, + comment = commentOpt + ) + } + override def visitCreatePipelineDataset( ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) { val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader() @@ -1603,7 +1616,7 @@ class SparkSqlAstBuilder extends AstBuilder { partitionExpressions(partTransforms, partCols, ctx) ++ clusterBySpec.map(_.asTransform) - // Because the createTableClauses grammar is reused for createPipelineDataset but pipeline + // Because the createTableClauses grammar is reused for pipeline datasets but pipeline // datasets don't support bucketing, options, storage location, or Hive SerDe, validate they // are not set. if (bucketSpec.isDefined) { @@ -1646,6 +1659,10 @@ class SparkSqlAstBuilder extends AstBuilder { ) if (createPipelineDatasetHeaderCtx.materializedView() != null) { + if (ctx.autoCdcBody() != null) { + throw operationNotAllowed( + "AUTO CDC is only supported for STREAMING TABLE, not MATERIALIZED VIEW.", ctx) + } val query: ParserRuleContext = Option(ctx.query).getOrElse( throw operationNotAllowed( s"Unable to find query for CREATE $syntaxTypeErrorStr statement.", ctx) @@ -1660,25 +1677,42 @@ class SparkSqlAstBuilder extends AstBuilder { ifNotExists = ifNotExists ) } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) { - Option(ctx.query) match { - case Some(query) => - CreateStreamingTableAsSelect( - name = datasetIdentifier, - columns = colDefs, - partitioning = partitioning, - tableSpec = spec, - query = plan(query), - originalText = source(query), - ifNotExists = ifNotExists - ) - case None => - CreateStreamingTable( - name = datasetIdentifier, - columns = colDefs, - partitioning = partitioning, - tableSpec = spec, - ifNotExists = ifNotExists - ) + if (ctx.autoCdcBody() != null) { + val params = parseAutoCdcParams(ctx.autoCdcBody().autoCdcParameters()) + CreateStreamingTableAutoCdc( + name = datasetIdentifier, + columns = colDefs, + partitioning = partitioning, + tableSpec = spec, + ifNotExists = ifNotExists, + source = params.source, + keys = params.keys, + deleteCondition = params.deleteCondition, + sequenceByExpr = params.sequencing, + includeColumns = params.includeColumns, + excludeColumns = params.excludeColumns + ) + } else { + Option(ctx.query) match { + case Some(query) => + CreateStreamingTableAsSelect( + name = datasetIdentifier, + columns = colDefs, + partitioning = partitioning, + tableSpec = spec, + query = plan(query), + originalText = source(query), + ifNotExists = ifNotExists + ) + case None => + CreateStreamingTable( + name = datasetIdentifier, + columns = colDefs, + partitioning = partitioning, + tableSpec = spec, + ifNotExists = ifNotExists + ) + } } } else { // Should never be possible based on grammar definition. diff --git a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out index cbee1375aba83..0c57178048ebe 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out @@ -16,6 +16,7 @@ AND true ANTI false ANY true ANY_VALUE false +APPLY false APPROX false ARCHIVE false ARRAY false @@ -25,6 +26,7 @@ ASENSITIVE false AT false ATOMIC false AUTHORIZATION true +AUTO false BEGIN false BERNOULLI false BETWEEN false @@ -49,6 +51,7 @@ CASE true CAST true CATALOG false CATALOGS false +CDC false CHANGE false CHANGES false CHAR false @@ -335,6 +338,7 @@ SECURITY false SELECT true SEMI false SEPARATED false +SEQUENCE false SERDE false SERDEPROPERTIES false SESSION_USER true diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index dfdbc5fccb56b..e0528a75c6ef5 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -16,6 +16,7 @@ AND false ANTI false ANY false ANY_VALUE false +APPLY false APPROX false ARCHIVE false ARRAY false @@ -25,6 +26,7 @@ ASENSITIVE false AT false ATOMIC false AUTHORIZATION false +AUTO false BEGIN false BERNOULLI false BETWEEN false @@ -49,6 +51,7 @@ CASE false CAST false CATALOG false CATALOGS false +CDC false CHANGE false CHANGES false CHAR false @@ -335,6 +338,7 @@ SECURITY false SELECT false SEMI false SEPARATED false +SEQUENCE false SERDE false SERDEPROPERTIES false SESSION_USER false diff --git a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out index dfdbc5fccb56b..e0528a75c6ef5 100644 --- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out @@ -16,6 +16,7 @@ AND false ANTI false ANY false ANY_VALUE false +APPLY false APPROX false ARCHIVE false ARRAY false @@ -25,6 +26,7 @@ ASENSITIVE false AT false ATOMIC false AUTHORIZATION false +AUTO false BEGIN false BERNOULLI false BETWEEN false @@ -49,6 +51,7 @@ CASE false CAST false CATALOG false CATALOGS false +CDC false CHANGE false CHANGES false CHAR false @@ -335,6 +338,7 @@ SECURITY false SELECT false SEMI false SEPARATED false +SEQUENCE false SERDE false SERDEPROPERTIES false SESSION_USER false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AutoCdcParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AutoCdcParserSuite.scala new file mode 100644 index 0000000000000..eee19dfefc2ee --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AutoCdcParserSuite.scala @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v2 + +import org.apache.spark.sql.catalyst.analysis.{ + AnalysisTest, UnresolvedAttribute, UnresolvedIdentifier, UnresolvedRelation} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.{ + AutoCdcIntoCommand, + CreateFlowCommand, + CreateStreamingTableAutoCdc +} +import org.apache.spark.sql.execution.SparkSqlParser + +/** + * Parser tests for AUTO CDC syntax. + * + * Covers two supported forms: + * 1. CREATE FLOW [COMMENT ...] AS AUTO CDC INTO ... + * 2. CREATE STREAMING TABLE FLOW AUTO CDC ... + * + * Snapshot CDC, SCD Type 2, IGNORE NULL UPDATES, and APPLY AS TRUNCATE WHEN are not + * supported and should fail to parse. The standalone AUTO CDC INTO form (without CREATE FLOW + * or CREATE STREAMING TABLE) is also not supported. + */ +class AutoCdcParserSuite extends CommandSuiteBase with AnalysisTest { + protected lazy val parser = new SparkSqlParser() + + // --------------------------------------------------------------------------- + // CREATE FLOW ... AS AUTO CDC INTO + // --------------------------------------------------------------------------- + + test("CREATE FLOW AS AUTO CDC INTO - minimal form") { + val plan = parser.parsePlan( + """CREATE FLOW myflow AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (key1, key2) + |SEQUENCE BY timestamp""".stripMargin) + + val cmd = plan.asInstanceOf[CreateFlowCommand] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == Seq("myflow")) + assert(cmd.comment.isEmpty) + + val cdc = cmd.flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.targetTable.table == "target") + val source = cdc.source.asInstanceOf[UnresolvedRelation] + assert(source.multipartIdentifier == Seq("source")) + assert(source.isStreaming) + assert(cdc.keys.map(_.name) == Seq("key1", "key2")) + assert(cdc.deleteCondition.isEmpty) + assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cdc.includeColumns.isEmpty) + assert(cdc.excludeColumns.isEmpty) + } + + test("CREATE FLOW AS AUTO CDC INTO - multipart source name") { + val plan = parser.parsePlan( + """CREATE FLOW myflow AS AUTO CDC INTO target + |FROM STREAM(mycat.myschema.source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + val source = cdc.source.asInstanceOf[UnresolvedRelation] + assert(source.multipartIdentifier == Seq("mycat", "myschema", "source")) + assert(source.isStreaming) + } + + test("CREATE FLOW AS AUTO CDC INTO - with COMMENT") { + val plan = parser.parsePlan( + """CREATE FLOW myflow COMMENT 'my comment' AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateFlowCommand] + assert(cmd.comment == Some("my comment")) + } + + test("CREATE FLOW AS AUTO CDC INTO - multipart flow name") { + val plan = parser.parsePlan( + """CREATE FLOW mycat.myschema.myflow AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateFlowCommand] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == + Seq("mycat", "myschema", "myflow")) + } + + test("CREATE FLOW AS AUTO CDC INTO - two-part target table name") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO myschema.mytable + |FROM STREAM(source) + |KEYS (k) + |SEQUENCE BY ts""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.targetTable.database == Some("myschema")) + assert(cdc.targetTable.table == "mytable") + } + + test("CREATE FLOW AS AUTO CDC INTO - APPLY AS DELETE WHEN") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |APPLY AS DELETE WHEN op = 'DELETE' + |SEQUENCE BY ts""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.deleteCondition.isDefined) + assert(cdc.deleteCondition.get.sql.contains("op")) + } + + test("CREATE FLOW AS AUTO CDC INTO - COLUMNS include list") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS id, name, value""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.includeColumns.get.map(_.name) == Seq("id", "name", "value")) + assert(cdc.excludeColumns.isEmpty) + } + + test("CREATE FLOW AS AUTO CDC INTO - COLUMNS * EXCEPT list") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS * EXCEPT (op, ts)""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.includeColumns.isEmpty) + assert(cdc.excludeColumns.get.map(_.name) == Seq("op", "ts")) + } + + test("CREATE FLOW AS AUTO CDC INTO - all clauses combined") { + val plan = parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (key1, key2) + |APPLY AS DELETE WHEN key3 = 3 + |SEQUENCE BY timestamp + |COLUMNS key1, key2, key3, timestamp""".stripMargin) + + val cdc = plan.asInstanceOf[CreateFlowCommand].flowOperation.asInstanceOf[AutoCdcIntoCommand] + assert(cdc.keys.map(_.name) == Seq("key1", "key2")) + assert(cdc.deleteCondition.isDefined) + assert(cdc.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cdc.includeColumns.get.map(_.name) == Seq("key1", "key2", "key3", "timestamp")) + } + + // --------------------------------------------------------------------------- + // CREATE STREAMING TABLE ... FLOW AUTO CDC + // --------------------------------------------------------------------------- + + test("CREATE STREAMING TABLE FLOW AUTO CDC - minimal form") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (key1, key2) + |SEQUENCE BY timestamp""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == Seq("target")) + assert(!cmd.ifNotExists) + val source = cmd.source.asInstanceOf[UnresolvedRelation] + assert(source.multipartIdentifier == Seq("source")) + assert(source.isStreaming) + assert(cmd.keys.map(_.name) == Seq("key1", "key2")) + assert(cmd.deleteCondition.isEmpty) + assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cmd.includeColumns.isEmpty) + assert(cmd.excludeColumns.isEmpty) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - multipart source name") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM STREAM(mycat.myschema.source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + val source = cmd.source.asInstanceOf[UnresolvedRelation] + assert(source.multipartIdentifier == Seq("mycat", "myschema", "source")) + assert(source.isStreaming) + } + + test("CREATE STREAMING TABLE IF NOT EXISTS FLOW AUTO CDC") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE IF NOT EXISTS target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.ifNotExists) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - multipart table name") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE myschema.mytable + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.name.asInstanceOf[UnresolvedIdentifier].nameParts == Seq("myschema", "mytable")) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - APPLY AS DELETE WHEN") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |APPLY AS DELETE WHEN op = 'DELETE' + |SEQUENCE BY ts""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.deleteCondition.isDefined) + assert(cmd.deleteCondition.get.sql.contains("op")) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS include list") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS id, name, value""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.includeColumns.get.map(_.name) == Seq("id", "name", "value")) + assert(cmd.excludeColumns.isEmpty) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - COLUMNS * EXCEPT list") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS * EXCEPT (op, ts)""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.includeColumns.isEmpty) + assert(cmd.excludeColumns.get.map(_.name) == Seq("op", "ts")) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - all clauses combined") { + val plan = parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (key1, key2) + |APPLY AS DELETE WHEN key3 = 3 + |SEQUENCE BY timestamp + |COLUMNS * EXCEPT (key4)""".stripMargin) + + val cmd = plan.asInstanceOf[CreateStreamingTableAutoCdc] + assert(cmd.keys.map(_.name) == Seq("key1", "key2")) + assert(cmd.deleteCondition.isDefined) + assert(cmd.sequenceByExpr == UnresolvedAttribute("timestamp")) + assert(cmd.excludeColumns.get.map(_.name) == Seq("key4")) + } + + // --------------------------------------------------------------------------- + // Error cases: missing required clause + // --------------------------------------------------------------------------- + + test("CREATE FLOW AS AUTO CDC INTO - SEQUENCE BY is required") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id)""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "end of input", "hint" -> "") + ) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - SEQUENCE BY is required") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id)""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "end of input", "hint" -> "") + ) + } + + // --------------------------------------------------------------------------- + // Error cases: wrong clause order + // --------------------------------------------------------------------------- + + test("SEQUENCE BY before APPLY AS DELETE is not allowed") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |APPLY AS DELETE WHEN a = 1""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'APPLY'", "hint" -> "") + ) + } + + test("COLUMNS before SEQUENCE BY is not allowed") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |COLUMNS a, b + |SEQUENCE BY ts""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'COLUMNS'", "hint" -> "") + ) + } + + // --------------------------------------------------------------------------- + // Error cases: standalone form not supported + // --------------------------------------------------------------------------- + + test("standalone AUTO CDC INTO is not supported") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'AUTO'", "hint" -> "") + ) + } + + // --------------------------------------------------------------------------- + // Error cases: source must be a STREAM(...) + // --------------------------------------------------------------------------- + + test("CREATE FLOW AS AUTO CDC INTO - source without STREAM is not allowed") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'source'", "hint" -> "") + ) + } + + test("CREATE STREAMING TABLE FLOW AUTO CDC - source without STREAM is not allowed") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE STREAMING TABLE target + |FLOW AUTO CDC + |FROM source + |KEYS (id) + |SEQUENCE BY ts""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'source'", "hint" -> "") + ) + } + + // --------------------------------------------------------------------------- + // Error cases: KEYS and COLUMNS only accept simple identifiers + // --------------------------------------------------------------------------- + + test("KEYS does not accept multipart identifiers") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (a.id) + |SEQUENCE BY ts""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'.'", "hint" -> "") + ) + } + + test("COLUMNS include list does not accept multipart identifiers") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS a.name""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'.'", "hint" -> "") + ) + } + + test("COLUMNS * EXCEPT list does not accept multipart identifiers") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |COLUMNS * EXCEPT (a.op)""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'.'", "hint" -> "") + ) + } + + // --------------------------------------------------------------------------- + // Error cases: unsupported dataset types and table features + // --------------------------------------------------------------------------- + + test("AUTO CDC is not supported for MATERIALIZED VIEW") { + val sql = + """CREATE MATERIALIZED VIEW target + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin + checkError( + intercept[ParseException] { parser.parsePlan(sql) }, + condition = "_LEGACY_ERROR_TEMP_0035", + parameters = Map( + "message" -> "AUTO CDC is only supported for STREAMING TABLE, not MATERIALIZED VIEW."), + queryContext = Array(ExpectedContext(sql, 0, sql.length - 1)) + ) + } + + test("column constraints are not supported for AUTO CDC streaming table") { + val sql = + """CREATE STREAMING TABLE target (id INT PRIMARY KEY, name STRING) + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin + checkError( + intercept[ParseException] { parser.parsePlan(sql) }, + condition = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> + ("Pipeline datasets do not currently support column constraints. " + + "Please remove any CHECK, UNIQUE, PK, and FK constraints " + + "specified on the pipeline dataset.")), + queryContext = Array(ExpectedContext(sql, 0, sql.length - 1)) + ) + } + + test("bucketing is not supported for AUTO CDC streaming table") { + val sql = + """CREATE STREAMING TABLE target + |CLUSTERED BY (id) INTO 4 BUCKETS + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin + checkError( + intercept[ParseException] { parser.parsePlan(sql) }, + condition = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> + ("Bucketing is not supported for CREATE STREAMING TABLE statements. " + + "Please remove any bucket spec specified in the statement.")), + queryContext = Array(ExpectedContext(sql, 0, sql.length - 1)) + ) + } + + test("options are not supported for AUTO CDC streaming table") { + val sql = + """CREATE STREAMING TABLE target + |OPTIONS (key = 'value') + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin + checkError( + intercept[ParseException] { parser.parsePlan(sql) }, + condition = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> + ("Options are not supported for CREATE STREAMING TABLE statements. " + + "Please remove any OPTIONS lists specified in the statement.")), + queryContext = Array(ExpectedContext(sql, 0, sql.length - 1)) + ) + } + + test("serde is not supported for AUTO CDC streaming table") { + val sql = + """CREATE STREAMING TABLE target + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin + checkError( + intercept[ParseException] { parser.parsePlan(sql) }, + condition = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> + ("Hive SerDe format options are not supported for " + + "CREATE STREAMING TABLE statements.")), + queryContext = Array(ExpectedContext(sql, 0, sql.length - 1)) + ) + } + + test("location is not supported for AUTO CDC streaming table") { + val sql = + """CREATE STREAMING TABLE target + |LOCATION '/tmp/data' + |FLOW AUTO CDC + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts""".stripMargin + checkError( + intercept[ParseException] { parser.parsePlan(sql) }, + condition = "_LEGACY_ERROR_TEMP_0035", + parameters = Map("message" -> + ("Specifying location is not supported for CREATE STREAMING TABLE statements. " + + "The storage location for a pipeline dataset is managed by the pipeline itself.")), + queryContext = Array(ExpectedContext(sql, 0, sql.length - 1)) + ) + } + + // --------------------------------------------------------------------------- + // Error cases: deprecated / unsupported syntax + // --------------------------------------------------------------------------- + + test("APPLY AS TRUNCATE WHEN is not supported") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |APPLY AS TRUNCATE WHEN op = 'TRUNCATE' + |SEQUENCE BY ts""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'TRUNCATE'", "hint" -> "") + ) + } + + test("IGNORE NULL UPDATES is not supported") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |IGNORE NULL UPDATES + |SEQUENCE BY ts""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'IGNORE'", "hint" -> "") + ) + } + + test("STORED AS SCD TYPE 2 is not supported") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |STORED AS SCD TYPE 2""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'STORED'", "hint" -> "") + ) + } + + test("TRACK HISTORY ON is not supported") { + checkError( + intercept[ParseException] { + parser.parsePlan( + """CREATE FLOW f AS AUTO CDC INTO target + |FROM STREAM(source) + |KEYS (id) + |SEQUENCE BY ts + |TRACK HISTORY ON value1, value2""".stripMargin) + }, + condition = "PARSE_SYNTAX_ERROR", + sqlState = "42601", + parameters = Map("error" -> "'TRACK'", "hint" -> "") + ) + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 276a266f5d16c..4e8f117dc8a5c 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALIGN,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,APPROX,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,BEGIN,BERNOULLI,BETWEEN,BIGINT,BIN,BINARY,BINDING,BIN_DISTRIBUTE_RATIO,BIN_END,BIN_START,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATABASE,CURRENT_DATE,CURRENT_PATH,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXACT,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCALTIME,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEAREST,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNIFORM,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WIDTH,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") + assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALIGN,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,APPLY,APPROX,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,AUTO,BEGIN,BERNOULLI,BETWEEN,BIGINT,BIN,BINARY,BINDING,BIN_DISTRIBUTE_RATIO,BIN_END,BIN_START,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CDC,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATABASE,CURRENT_DATE,CURRENT_PATH,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXACT,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCALTIME,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEAREST,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SEQUENCE,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNIFORM,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WIDTH,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") // scalastyle:on line.size.limit } }