diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala index 75f04f8f38c6..4972f8f1f981 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ArchiveReader.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.io.{Closeable, InputStream} +import java.io.{Closeable, File, FileOutputStream, InputStream} import java.util.Locale import java.util.regex.Pattern import java.util.zip.GZIPInputStream @@ -32,8 +32,10 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Text import org.apache.hadoop.util.LineReader -import org.apache.spark.TaskContext -import org.apache.spark.util.HadoopFSUtils +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.{HadoopFSUtils, Utils} /** * Streaming reader for a single archive file. The archive is opened once and decompressed/unpacked @@ -68,6 +70,34 @@ abstract class ArchiveReader(path: Path) { conf: Configuration, ignoredPathSegmentRegex: Pattern = HadoopFSUtils.defaultIgnoredPathSegmentRegexPattern)( parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] + + /** + * Materializes each archive entry to a file under `localDir` and yields `(entryName, localFile)` + * for every entry kept by `entryFilter`. This is the counterpart to [[readEntries]] for formats + * that cannot be parsed from a streaming `InputStream` and instead need random access to a + * complete file (e.g. Parquet/ORC, which read a trailing footer). Entries are materialized one at + * a time as the returned iterator advances, so a caller that consumes sequentially and deletes + * each file after use keeps at most one entry in `localDir` at a time. The caller owns the + * lifetime of the produced files and of `localDir`. + * + * @param conf Hadoop conf used to open and decompress the archive. + * @param localDir an existing local directory the entries are written into. + * @param entryFilter keeps only entries whose name satisfies it; entries that fail it are skipped + * without being written to disk. Defaults to keeping every entry that + * [[readEntries]] surfaces (i.e. non-directory, non-dotfile entries). + * @return an iterator of `(entry name, local file)`; each file lives until the caller deletes it. + */ + final def localizeEntries( + conf: Configuration, + localDir: File, + entryFilter: String => Boolean = _ => true): Iterator[(String, File)] = + readEntries(conf) { (name, in) => + if (entryFilter(name)) { + Iterator.single((name, ArchiveReader.copyEntryToLocalFile(in, localDir, name))) + } else { + Iterator.empty + } + } } object ArchiveReader { @@ -133,6 +163,117 @@ object ArchiveReader { } } } + + /** + * Copies one archive entry's bytes to a fresh file under `localDir`, used by + * [[ArchiveReader.localizeEntries]] to localize entries for random-access formats. The entry + * stream is not closed here (the reader owns the underlying archive stream); the output file is. + * Uniqueness comes from [[File.createTempFile]], so entries that share a basename across archive + * subdirectories never collide; the sanitized basename is kept only as a readable suffix. + * + * @param in bytes of one archive entry. + * @param localDir an existing local directory the file is created in. + * @param entryName the entry's name within the archive, used only to derive a readable file name. + * @return the local file the entry was written to. + */ + private def copyEntryToLocalFile(in: InputStream, localDir: File, entryName: String): File = { + val rawBasename = entryName.substring(entryName.lastIndexOf('/') + 1) + val basename = rawBasename.replaceAll("[^A-Za-z0-9._-]", "_") + val local = File.createTempFile("archive-entry-", "-" + basename, localDir) + val out = new FileOutputStream(local) + try Utils.copyStream(in, out) finally out.close() + local + } + + /** + * Reads an archive of random-access files: each kept entry is unpacked to a temp file under a + * fresh local dir and read with `readOne`. The counterpart to [[readEntries]] for formats that + * need a complete file (Parquet/ORC footers, Excel). Entries are unpacked one at a time, each + * reader and temp file released before the next opens; the temp dir, current reader, and archive + * stream are released on task completion, so an abandoned read (e.g. a `LIMIT`) does not leak. + * The per-entry [[PartitionedFile]] points at the temp file but keeps the archive's path and + * modification time, so `input_file_name()` / `_metadata` still report the archive. + */ + def readLocalizedEntries( + file: PartitionedFile, + conf: Configuration, + entryFilter: String => Boolean, + tempPrefix: String)( + readOne: PartitionedFile => Iterator[InternalRow]): Iterator[InternalRow] = { + val tempDir = Utils.createTempDir(Utils.getLocalDir(SparkEnv.get.conf), tempPrefix) + val entries = ArchiveReader(file.toPath).localizeEntries(conf, tempDir, entryFilter) + + // Element type is `Object`, not `InternalRow`: a batch scan yields `ColumnarBatch`, so + // per-element casts would fail; the whole iterator is cast once at the end. + val rows = new Iterator[Object] with Closeable { + private var current: Iterator[Object] = Iterator.empty + private var currentFile: File = _ + private var done = false + + private def releaseCurrent(): Unit = { + current match { + case c: Closeable => try c.close() catch { case NonFatal(_) => } + case _ => + } + current = Iterator.empty + if (currentFile != null) { + currentFile.delete() + currentFile = null + } + } + + // Advance on `hasNext`, not in `next`, so a reader reusing a mutable batch is not probed for + // the next entry before the current batch is consumed. + private def advance(): Unit = { + while (!done && !current.hasNext) { + releaseCurrent() + if (entries.hasNext) { + val (_, entryFile) = entries.next() + currentFile = entryFile + current = readOne(file.copy( + filePath = SparkPath.fromUri(entryFile.toURI), + start = 0L, + length = entryFile.length(), + fileSize = entryFile.length(), + modificationTime = file.modificationTime)).asInstanceOf[Iterator[Object]] + } else { + done = true + } + } + } + + override def hasNext: Boolean = { + advance() + !done && current.hasNext + } + + override def next(): Object = { + if (!hasNext) throw new NoSuchElementException + current.next() + } + + override def close(): Unit = { + done = true + releaseCurrent() + entries match { + case c: Closeable => try c.close() catch { case NonFatal(_) => } + case _ => + } + } + } + + // Only delete the temp dir here; do NOT close `rows`. `rows` is Closeable and becomes + // FileScanRDD's `currentIterator`, which its own (early-registered) task-completion listener + // closes -- after downstream exec nodes' listeners, per reverse-order execution. Closing the + // per-entry reader from this listener (registered during iteration, so it would run first) + // would free the vectorized reader's off-heap column vectors while downstream operators may + // still reference them, re-introducing the SPARK-37089 use-after-free. FileScanRDD's close of + // `rows` still releases the current reader and the archive stream, so nothing leaks. + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit] { _ => + Utils.deleteRecursively(tempDir) + }) + rows.asInstanceOf[Iterator[InternalRow]] + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 733e55a8609e..da887232eb63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.{Closeable, FileNotFoundException} +import java.io.{Closeable, File, FileNotFoundException, IOException} import java.time.ZoneId +import java.util.Locale import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.util.{Failure, Try} +import scala.util.control.NonFatal import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration @@ -37,16 +39,17 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GRO import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.util.HadoopInputFile -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.{PATH, SCHEMA} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, RebaseDateTime} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.types.ops.ParquetTypeOps @@ -86,9 +89,86 @@ class ParquetFileFormat sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { + val parquetOptions = new ParquetOptions(parameters, getSqlConf(sparkSession)) + if (parquetOptions.archiveFormatEnabled && + files.exists(f => ArchiveReader.isArchivePath(f.getPath))) { + return inferArchiveSchema(sparkSession, parameters, files, parquetOptions.mergeSchema, + parquetOptions.ignoreCorruptFiles, parquetOptions.ignoreMissingFiles) + } ParquetUtils.inferSchema(sparkSession, parameters, files) } + /** + * Schema inference when the inputs include archives. Loose files use the standard + * [[ParquetUtils.inferSchema]] path and seed the merge; each archive's entries fold in one at a + * time via [[ParquetFileFormat.mergeArchiveEntrySchemas]], bounding disk to one entry. + * `mergeSchema = false` samples a single schema. A corrupt/missing archive is skipped under the + * matching `ignore*` flag. + */ + private def inferArchiveSchema( + sparkSession: SparkSession, + parameters: Map[String, String], + files: Seq[FileStatus], + mergeSchema: Boolean, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean): Option[StructType] = { + val conf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + val (archives, nonArchives) = files.partition(f => ArchiveReader.isArchivePath(f.getPath)) + val tempDir = Utils.createTempDir(namePrefix = "parquet-archive-infer") + + // Folds one archive's entries (at most `limit`) into `seed`, always closing the archive stream + // (the driver has no TaskContext to do it); skips the archive under the matching ignore flag. + def foldArchive( + archive: FileStatus, seed: Option[StructType], limit: Int): Option[StructType] = { + try { + val entries = ArchiveReader(archive.getPath) + .localizeEntries(conf, tempDir, ParquetFileFormat.isParquetArchiveEntry) + try ParquetFileFormat.mergeArchiveEntrySchemas( + sparkSession, parameters, entries.take(limit), seed) + finally entries match { + case c: Closeable => c.close() + case _ => + } + } catch { + case e: Exception if ignoreMissingFiles && + ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) => + logWarning(log"Skipping missing archive during inference: " + + log"${MDC(PATH, archive.getPath.toString)}", e) + seed + case NonFatal(e) => + Utils.getRootCause(e) match { + // A missing archive is a FileNotFoundException (a subtype of IOException); it must be + // governed by ignoreMissingFiles (handled above), not silently dropped as corrupt under + // ignoreCorruptFiles -- matching FileScanRDD, which rethrows missing-file errors + // regardless of ignoreCorruptFiles. + case _: FileNotFoundException => throw e + case _: RuntimeException | _: IOException if ignoreCorruptFiles => + logWarning(log"Skipping corrupt archive during inference: " + + log"${MDC(PATH, archive.getPath.toString)}", e) + seed + case _ => throw e + } + } + } + + try { + val looseSchema = + if (nonArchives.nonEmpty) ParquetUtils.inferSchema(sparkSession, parameters, nonArchives) + else None + if (mergeSchema) { + archives.foldLeft(looseSchema)((acc, a) => foldArchive(a, acc, Int.MaxValue)) + } else if (looseSchema.isDefined) { + // Not merging: the loose schema already samples one, so leave the archives unopened. + looseSchema + } else { + // Sample the first archive entry that yields a schema. + archives.iterator.map(foldArchive(_, None, 1)).find(_.isDefined).flatten + } + } finally { + Utils.deleteRecursively(tempDir) + } + } + /** * Returns whether the reader can return the rows as batch or not. */ @@ -113,6 +193,12 @@ class ParquetFileFormat sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = { + // An archive is read by unpacking its entries, so the archive itself is a single split + // rather than carved into byte ranges. + val parquetOptions = new ParquetOptions(options, getSqlConf(sparkSession)) + if (parquetOptions.archiveFormatEnabled && ArchiveReader.isArchivePath(path)) { + return false + } true } @@ -204,6 +290,7 @@ class ParquetFileFormat val parquetOptions = new ParquetOptions(options, sqlConf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + val archiveFormatEnabled = parquetOptions.archiveFormatEnabled // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. @@ -219,7 +306,7 @@ class ParquetFileFormat assert(supportBatch(sparkSession, resultSchema)) } - (file: PartitionedFile) => { + val readSingleFile: PartitionedFile => Iterator[InternalRow] = (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) @@ -308,6 +395,25 @@ class ParquetFileFormat } } } + + // An archive is read by unpacking each Parquet entry to a local temp file and reading it with + // the plain JVM reader -- Parquet needs random access to its footer, so it cannot be streamed. + // ArchiveReader.readLocalizedEntries owns the unpack/iterate/cleanup; the per-entry read reuses + // readSingleFile, which keeps `input_file_name()` / `_metadata` pointing at the archive path. + def readArchiveFile(file: PartitionedFile): Iterator[InternalRow] = + ArchiveReader.readLocalizedEntries( + file, broadcastedHadoopConf.value.value, + ParquetFileFormat.isParquetArchiveEntry, "parquet-archive") { + entryFile => readSingleFile(entryFile) + } + + (file: PartitionedFile) => { + if (archiveFormatEnabled && ArchiveReader.isArchivePath(file.toPath)) { + readArchiveFile(file) + } else { + readSingleFile(file) + } + } } // scalastyle:off argcount @@ -445,6 +551,16 @@ class ParquetFileFormat } object ParquetFileFormat extends Logging { + + /** + * Whether an archive entry is a Parquet data file. Skips non-Parquet sidecars an archive may + * contain (e.g. `_SUCCESS`, `_committed_*`, `_common_metadata`) so an archive reads like a + * directory of Parquet part-files. The read path and schema inference must use the same + * predicate, or the inferred schema would not match the data that is read. + */ + private[parquet] def isParquetArchiveEntry(name: String): Boolean = + name.toLowerCase(Locale.ROOT).endsWith(".parquet") + val ROW_INDEX = "row_index" // A name for a temporary column that holds row indexes computed by the file format reader @@ -567,6 +683,19 @@ object ParquetFileFormat extends Logging { parameters: Map[String, String], filesToTouch: Seq[FileStatus], sparkSession: SparkSession): Option[StructType] = { + val reader = buildSchemaReader(sparkSession) + SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader) + } + + /** + * Builds the Parquet footer-reading function, shared by the distributed merge + * ([[mergeSchemasInParallel]]) and the sequential archive-entry merge + * ([[mergeArchiveEntrySchemas]]). `SQLConf` is read here so the returned closure stays + * serializable. + */ + private[parquet] def buildSchemaReader( + sparkSession: SparkSession) + : (Seq[FileStatus], Configuration, Boolean, Boolean) => Seq[StructType] = { val sqlConf = SessionStateHelper.getSqlConf(sparkSession) val assumeBinaryIsString = sqlConf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlConf.isParquetINT96AsTimestamp @@ -576,7 +705,7 @@ object ParquetFileFormat extends Logging { val respectUnknownTypeAnnotation = sqlConf.parquetReaderRespectUnknownTypeAnnotation - val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean, + (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean, ignoreMissingFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = new ParquetToSparkSchemaConverter( @@ -590,8 +719,46 @@ object ParquetFileFormat extends Logging { readParquetFootersInParallel(conf, files, ignoreCorruptFiles, ignoreMissingFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) } + } - SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader) + /** + * Driver-side, sequential analog of [[mergeSchemasInParallel]] for archive entries: they unpack + * to a driver-local temp dir that executors can't read, so the schemas can't be merged in a + * distributed job. Reads each lazily-unpacked entry's footer and deletes it before pulling the + * next, so disk holds one entry at a time; `seed` is the starting schema (loose files). The + * caller owns closing `entries`. + */ + private[sql] def mergeArchiveEntrySchemas( + sparkSession: SparkSession, + parameters: Map[String, String], + entries: Iterator[(String, File)], + seed: Option[StructType]): Option[StructType] = { + val caseSensitive = SessionStateHelper.getSqlConf(sparkSession).caseSensitiveAnalysis + val fileSourceOptions = new FileSourceOptions(CaseInsensitiveMap(parameters)) + val conf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + val reader = buildSchemaReader(sparkSession) + + var merged = seed + entries.foreach { case (_, file) => + try { + // The footer reader needs only a path and length (cf. mergeSchemasInParallel). + val status = new FileStatus( + file.length(), false, 0, 0, file.lastModified(), 0, null, null, null, + new Path(file.toURI)) + reader(Seq(status), conf, fileSourceOptions.ignoreCorruptFiles, + fileSourceOptions.ignoreMissingFiles).foreach { schema => + merged = Some(merged.fold(schema) { acc => + try acc.merge(schema, caseSensitive) catch { + case cause: SparkException => + throw QueryExecutionErrors.failedMergingSchemaError(acc, schema, cause) + } + }) + } + } finally { + file.delete() + } + } + merged } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala index 8ec73fbfefda..33dcc4be8ffe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ArchiveReadSuiteBase.scala @@ -343,19 +343,6 @@ trait ArchiveReadSuiteBase extends QueryTest with SharedSparkSession { } } - test("archive inference widens a column's type across entries like a directory") { - // The column is integral in the first entry and string in the second; inference over all - // entries widens the merged type to string, exactly as a directory read would. - withArchiveFile() { archive => - writeArchive(archive, Seq( - entryName(0) -> encodeFile(Seq(1, 2).toDF("c")), - entryName(1) -> encodeFile(Seq("x").toDF("c")))) - val schema = inferredSchema(Seq(archive.getCanonicalPath)) - assert(schema.length == 1 && schema.head.dataType == StringType, - s"expected the column widened to string across entries, got $schema") - } - } - test("inference merges archive entries with loose files in the same directory") { // An archive entry and a loose file with the same schema infer one combined schema, matching // a directory read of the same files. @@ -379,6 +366,22 @@ trait ArchiveReadSuiteBase extends QueryTest with SharedSparkSession { // ----- shared schema-merge tests (run when `supportsSchemaMerge`) ---------- if (supportsSchemaMerge) { + test("archive inference widens a column's type across entries like a directory") { + // The column is integral in the first entry and string in the second; inference over all + // entries widens the merged type to string, exactly as a directory read would. This is a + // schema-merge behavior (cross-entry type reconciliation), so it is gated by + // `supportsSchemaMerge` rather than `supportsSchemaInference`: a format can infer a single + // file's schema without reconciling differing types across files (e.g. Parquet). + withArchiveFile() { archive => + writeArchive(archive, Seq( + entryName(0) -> encodeFile(Seq(1, 2).toDF("c")), + entryName(1) -> encodeFile(Seq("x").toDF("c")))) + val schema = inferredSchema(Seq(archive.getCanonicalPath)) + assert(schema.length == 1 && schema.head.dataType == StringType, + s"expected the column widened to string across entries, got $schema") + } + } + test("archive entries with differing fields read like a directory") { // One entry carries an extra field the other lacks; read under a schema covering both, the // missing field reads back null -- exactly as a directory read of the same files does. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ParquetArchiveReadBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ParquetArchiveReadBase.scala new file mode 100644 index 000000000000..e6bc9dbc3d0c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ParquetArchiveReadBase.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.nio.file.Files + +import org.apache.spark.sql.DataFrame +import org.apache.spark.util.Utils + +/** + * Binds [[ArchiveReadSuiteBase]]'s file-format hooks to Parquet. Unlike the streaming formats, + * Parquet entries are unpacked to a local file before reading (Parquet needs random access to its + * footer), but the read/inference parity contract the base verifies is identical. Parquet is + * self-describing, so [[inferenceOptions]] stays empty and the base's schema-inference tests run as + * well. There is no header concept, so (unlike CSV) there is a single base rather than header and + * headerless variants. + */ +trait ParquetArchiveReadBase extends ArchiveReadSuiteBase { + + override protected def format: String = "parquet" + + override protected def fileExtension: String = "parquet" + + override protected def readOptions: Map[String, String] = Map.empty + + override protected def readSchema: String = "id INT, name STRING" + + // Parquet has an authoritative per-file schema and only unions differing fields across files when + // `mergeSchema` is set, so it does not union by name during default inference the way the + // schema-on-read formats do. The differing-fields *read* (missing column -> null) is still + // supported and is covered by a Parquet-specific test in ParquetTarArchiveReadSuite. + override protected def supportsSchemaMerge: Boolean = false + + override protected def encodeFile( + df: DataFrame, + writeOptions: Map[String, String]): Array[Byte] = { + val dir = Utils.createTempDir(namePrefix = "archive-test-encode") + try { + df.coalesce(1).write.format("parquet") + .options(writeOptions).mode("overwrite").save(dir.getCanonicalPath) + val parts = dir.listFiles().filter { f => + f.isFile && !f.getName.startsWith("_") && !f.getName.startsWith(".") && + !f.getName.endsWith(".crc") + } + assert(parts.length == 1, + s"expected exactly one data file, got: ${parts.map(_.getName).toList}") + Files.readAllBytes(parts.head.toPath) + } finally Utils.deleteRecursively(dir) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ParquetTarArchiveReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ParquetTarArchiveReadSuite.scala new file mode 100644 index 000000000000..20c501a76f06 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ParquetTarArchiveReadSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.nio.file.Files + +import org.apache.spark.sql.functions.input_file_name +import org.apache.spark.sql.internal.SQLConf + +/** + * Reads of Parquet files packed in tar archives (`.tar`/`.tar.gz`/`.tgz`): the shared archive + * read/inference parity tests from [[ArchiveReadSuiteBase]] (bound to Parquet by + * [[ParquetArchiveReadBase]] over tar containers via [[TarArchiveReadBase]]), plus Parquet-specific + * tests for the unpack-to-disk read path. + */ +class ParquetTarArchiveReadSuite + extends ArchiveReadSuiteBase + with ParquetArchiveReadBase + with TarArchiveReadBase { + + import testImplicits._ + + for (vectorized <- Seq(true, false)) { + test(s"archive reads return the same rows with vectorized reader = $vectorized") { + // Parquet entries are read with the vectorized (columnar) reader or the row-based reader + // depending on this flag; both must read archive entries identically to a directory read. + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { + assertArchiveMatchesDir( + Seq(entryName(0) -> encodeFile(sampleDf((1, "Alice"), (2, "Bob"))))) + } + } + } + + test("input_file_name reports the archive path, not the unpacked temp file") { + withArchiveFile() { archive => + writeArchive(archive, Seq(entryName(0) -> encodeFile(sampleDf((1, "Alice"), (2, "Bob"))))) + val paths = read(archive.getCanonicalPath) + .select(input_file_name()).distinct().collect().map(_.getString(0)) + assert(paths.forall(_.contains("archive.tar")), + s"expected the archive path, got: ${paths.toList}") + assert(paths.forall(p => !p.contains("parquet-archive")), + s"the unpacked temp path leaked into input_file_name: ${paths.toList}") + } + } + + test("an abandoned read (LIMIT) over an archive returns partial rows and cleans up") { + withArchiveFile() { archive => + val parts = (0 until 4).map(i => entryName(i) -> encodeFile(sampleDf((i, s"v$i")))) + writeArchive(archive, parts) + // LIMIT stops iteration before later entries are reached; the task-completion listener + // removes the temp dir. Assert the query runs and returns exactly the requested rows. + assert(read(archive.getCanonicalPath).limit(2).collect().length == 2) + } + } + + test("archive entries with differing fields read like a directory") { + // Parquet does not union schemas during inference (supportsSchemaMerge = false), but reading + // entries with differing fields under a covering schema still fills the missing column with + // null, exactly as a directory read of the same files does. + val withName = sampleDf((1, "Alice"), (2, "Bob")) + val idOnly = Seq(3).toDF("id") + assertArchiveMatchesDir( + Seq(entryName(0) -> encodeFile(withName), entryName(1) -> encodeFile(idOnly))) + } + + test("archive inference unions differing fields across entries with mergeSchema=true") { + // Parquet does not union schemas during default inference, but `mergeSchema=true` folds every + // entry's schema; over an archive that folds each unpacked entry one at a time. The unioned + // schema must match a directory read of the same files under the same option. + val withName = sampleDf((1, "Alice"), (2, "Bob")) + val idExtra = Seq((3, 30)).toDF("id", "extra") + val entries = Seq(entryName(0) -> encodeFile(withName), entryName(1) -> encodeFile(idExtra)) + val merge = Map("mergeSchema" -> "true") + withArchiveFile() { archive => + writeArchive(archive, entries) + val archiveSchema = inferredSchema(Seq(archive.getCanonicalPath), merge) + withTempDir { dir => + entries.foreach { case (n, b) => Files.write(new File(dir, n).toPath, b) } + assert(archiveSchema.fieldNames.toSet == Set("id", "name", "extra"), + s"expected the union of entry fields, got $archiveSchema") + assert(archiveSchema == inferredSchema(Seq(dir.getCanonicalPath), merge), + s"archive mergeSchema inference diverged from a directory read; got $archiveSchema") + } + } + } +}