Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.testing.TestCompileRequest;
import org.apache.flink.table.gateway.api.testing.TestCompileResponse;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -368,4 +370,19 @@ <ClusterID> ClusterID deployScript(
@Nullable String script,
Configuration executionConfig)
throws SqlGatewayException;

// -------------------------------------------------------------------------
// Testing API
// -------------------------------------------------------------------------

/**
* Compile a test plan for a pipeline target.
*
* <p>Loads the pipeline into an analysis session, resolves mock schemas via DESCRIBE, detects
* window functions and temporal joins via EXPLAIN, and returns the compiled plan.
*
* @param request the compile request
* @return the compiled test plan
*/
TestCompileResponse compileTestPlan(TestCompileRequest request) throws SqlGatewayException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.api.testing;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* Request to compile a test plan for a pipeline target.
*
* <p>The CLI pre-splits the rendered pipeline SQL into individual statements and sends them in
* pipeline order (tables before views). The gateway loads them into an analysis session to resolve
* schemas and dependencies.
*/
public final class TestCompileRequest {

private final List<String> statements;
private final String target;
private final String mode;
private final List<String> mockTargets;

public TestCompileRequest(
List<String> statements, String target, String mode, List<String> mockTargets) {
this.statements = Collections.unmodifiableList(Objects.requireNonNull(statements));
this.target = Objects.requireNonNull(target);
this.mode = Objects.requireNonNull(mode);
this.mockTargets = Collections.unmodifiableList(Objects.requireNonNull(mockTargets));
}

/** Pre-split pipeline SQL statements in dependency order. */
public List<String> getStatements() {
return statements;
}

/** The top-level view or table to test. */
public String getTarget() {
return target;
}

/** Execution mode: {@code "batch"} or {@code "changelog"}. */
public String getMode() {
return mode;
}

/** Names of pipeline objects to be replaced with mock data. */
public List<String> getMockTargets() {
return mockTargets;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.api.testing;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* Response from compiling a test plan.
*
* <p>Contains everything the CLI needs to set up an execution session. The execution order is:
*
* <ol>
* <li>{@code configStatements} — session config, functions, USE statements (before mocks)
* <li>Mock creation — CLI creates mocks using {@code mocks} specifications
* <li>{@code pipelineStatements} — view DDL between mock boundaries and the target (after mocks)
* <li>Execute {@code querySql}
* </ol>
*/
public final class TestCompileResponse {

private final String contractVersion;
private final List<String> configStatements;
private final List<String> pipelineStatements;
private final String querySql;
private final List<String> warnings;
private final List<TestMockSpec> mocks;

public TestCompileResponse(
String contractVersion,
List<String> configStatements,
List<String> pipelineStatements,
String querySql,
List<String> warnings,
List<TestMockSpec> mocks) {
this.contractVersion = Objects.requireNonNull(contractVersion);
this.configStatements =
Collections.unmodifiableList(Objects.requireNonNull(configStatements));
this.pipelineStatements =
Collections.unmodifiableList(Objects.requireNonNull(pipelineStatements));
this.querySql = Objects.requireNonNull(querySql);
this.warnings = Collections.unmodifiableList(Objects.requireNonNull(warnings));
this.mocks = Collections.unmodifiableList(Objects.requireNonNull(mocks));
}

/** The testing API contract version. */
public String getContractVersion() {
return contractVersion;
}

/**
* Config/function statements to execute BEFORE creating mocks. Includes SET, CREATE FUNCTION,
* USE CATALOG, etc.
*/
public List<String> getConfigStatements() {
return configStatements;
}

/**
* Pipeline view DDL to execute AFTER creating mocks. These are the views between mock
* boundaries and the target, in dependency order.
*/
public List<String> getPipelineStatements() {
return pipelineStatements;
}

/** The final query to execute against the target. */
public String getQuerySql() {
return querySql;
}

/** Non-fatal warnings (e.g., temporal join detection). */
public List<String> getWarnings() {
return warnings;
}

/** Mock specifications with exact resolved schemas and materialization strategies. */
public List<TestMockSpec> getMocks() {
return mocks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.api.testing;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* Specification for a required mock in a compiled test plan.
*
* <p>The mock name matches the original pipeline object name so it can be created as a direct
* replacement in the execution session. The schema contains the exact resolved column types from the
* Flink catalog.
*/
public final class TestMockSpec {

/** Create mock as a VALUES-based temporary view. Suitable for batch and non-windowed cases. */
public static final String MATERIALIZATION_VIEW = "view";

/**
* Create mock as a filesystem-backed temporary table with watermarks. Required for windowed
* streaming/changelog tests where watermark advancement is needed.
*/
public static final String MATERIALIZATION_FILESYSTEM = "filesystem";

private final String requestedName;
private final String sessionObject;
private final String materialization;
private final List<TestSchemaColumn> schema;
@Nullable private final String watermarkColumn;
private final boolean includeSentinel;

public TestMockSpec(
String requestedName,
String sessionObject,
String materialization,
List<TestSchemaColumn> schema,
@Nullable String watermarkColumn,
boolean includeSentinel) {
this.requestedName = Objects.requireNonNull(requestedName, "requestedName");
this.sessionObject = Objects.requireNonNull(sessionObject, "sessionObject");
this.materialization = Objects.requireNonNull(materialization, "materialization");
this.schema = Collections.unmodifiableList(Objects.requireNonNull(schema, "schema"));
this.watermarkColumn = watermarkColumn;
this.includeSentinel = includeSentinel;
}

/** The canonical name of the mock target as specified in the test suite. */
public String getRequestedName() {
return requestedName;
}

/**
* The SQL identifier the toolkit should use when creating the mock object. For v1 (top-level
* targets only), this matches {@code requestedName}. For v2 with CTE targeting, this may differ
* — e.g., a CTE mock like {@code charges_view.base_deduped} would need a dot-free session
* identifier.
*/
public String getSessionObject() {
return sessionObject;
}

/** The materialization strategy: {@code "view"} or {@code "filesystem"}. */
public String getMaterialization() {
return materialization;
}

/** The exact resolved schema columns with SQL type strings. */
public List<TestSchemaColumn> getSchema() {
return schema;
}

/** The watermark column name, if filesystem materialization requires watermarks. */
@Nullable
public String getWatermarkColumn() {
return watermarkColumn;
}

/** Whether a sentinel row should be appended to advance watermarks past all windows. */
public boolean isIncludeSentinel() {
return includeSentinel;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.api.testing;

import java.util.Objects;

/** A column name and SQL type pair in a resolved mock schema. */
public final class TestSchemaColumn {

private final String name;
private final String type;

public TestSchemaColumn(String name, String type) {
this.name = Objects.requireNonNull(name, "name");
this.type = Objects.requireNonNull(type, "type");
}

public String getName() {
return name;
}

public String getType() {
return type;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TestSchemaColumn)) {
return false;
}
TestSchemaColumn that = (TestSchemaColumn) o;
return name.equals(that.name) && type.equals(that.type);
}

@Override
public int hashCode() {
return Objects.hash(name, type);
}

@Override
public String toString() {
return name + " " + type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.api.testing;

/** Constants for the testing API surface. */
public final class TestingApi {

/** Contract version advertised by the testing endpoints. */
public static final String CONTRACT_VERSION = "v2";

private TestingApi() {}
}
Loading
Loading