Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.{Closeable, InputStream}
import java.io.{Closeable, File, FileOutputStream, InputStream}
import java.util.Locale
import java.util.regex.Pattern
import java.util.zip.GZIPInputStream
Expand All @@ -32,8 +32,10 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.util.LineReader

import org.apache.spark.TaskContext
import org.apache.spark.util.HadoopFSUtils
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.util.{HadoopFSUtils, Utils}

/**
* Streaming reader for a single archive file. The archive is opened once and decompressed/unpacked
Expand Down Expand Up @@ -68,6 +70,34 @@ abstract class ArchiveReader(path: Path) {
conf: Configuration,
ignoredPathSegmentRegex: Pattern = HadoopFSUtils.defaultIgnoredPathSegmentRegexPattern)(
parseEntry: (String, InputStream) => Iterator[T]): Iterator[T]

/**
* Materializes each archive entry to a file under `localDir` and yields `(entryName, localFile)`
* for every entry kept by `entryFilter`. This is the counterpart to [[readEntries]] for formats
* that cannot be parsed from a streaming `InputStream` and instead need random access to a
* complete file (e.g. Parquet/ORC, which read a trailing footer). Entries are materialized one at
* a time as the returned iterator advances, so a caller that consumes sequentially and deletes
* each file after use keeps at most one entry in `localDir` at a time. The caller owns the
* lifetime of the produced files and of `localDir`.
*
* @param conf Hadoop conf used to open and decompress the archive.
* @param localDir an existing local directory the entries are written into.
* @param entryFilter keeps only entries whose name satisfies it; entries that fail it are skipped
* without being written to disk. Defaults to keeping every entry that
* [[readEntries]] surfaces (i.e. non-directory, non-dotfile entries).
* @return an iterator of `(entry name, local file)`; each file lives until the caller deletes it.
*/
final def localizeEntries(
conf: Configuration,
localDir: File,
entryFilter: String => Boolean = _ => true): Iterator[(String, File)] =
readEntries(conf) { (name, in) =>
if (entryFilter(name)) {
Iterator.single((name, ArchiveReader.copyEntryToLocalFile(in, localDir, name)))
} else {
Iterator.empty
}
}
}

object ArchiveReader {
Expand Down Expand Up @@ -133,6 +163,117 @@ object ArchiveReader {
}
}
}

/**
* Copies one archive entry's bytes to a fresh file under `localDir`, used by
* [[ArchiveReader.localizeEntries]] to localize entries for random-access formats. The entry
* stream is not closed here (the reader owns the underlying archive stream); the output file is.
* Uniqueness comes from [[File.createTempFile]], so entries that share a basename across archive
* subdirectories never collide; the sanitized basename is kept only as a readable suffix.
*
* @param in bytes of one archive entry.
* @param localDir an existing local directory the file is created in.
* @param entryName the entry's name within the archive, used only to derive a readable file name.
* @return the local file the entry was written to.
*/
private def copyEntryToLocalFile(in: InputStream, localDir: File, entryName: String): File = {
val rawBasename = entryName.substring(entryName.lastIndexOf('/') + 1)
val basename = rawBasename.replaceAll("[^A-Za-z0-9._-]", "_")
val local = File.createTempFile("archive-entry-", "-" + basename, localDir)
val out = new FileOutputStream(local)
try Utils.copyStream(in, out) finally out.close()
local
}

/**
* Reads an archive of random-access files: each kept entry is unpacked to a temp file under a
* fresh local dir and read with `readOne`. The counterpart to [[readEntries]] for formats that
* need a complete file (Parquet/ORC footers, Excel). Entries are unpacked one at a time, each
* reader and temp file released before the next opens; the temp dir, current reader, and archive
* stream are released on task completion, so an abandoned read (e.g. a `LIMIT`) does not leak.
* The per-entry [[PartitionedFile]] points at the temp file but keeps the archive's path and
* modification time, so `input_file_name()` / `_metadata` still report the archive.
*/
def readLocalizedEntries(
file: PartitionedFile,
conf: Configuration,
entryFilter: String => Boolean,
tempPrefix: String)(
readOne: PartitionedFile => Iterator[InternalRow]): Iterator[InternalRow] = {
val tempDir = Utils.createTempDir(Utils.getLocalDir(SparkEnv.get.conf), tempPrefix)
val entries = ArchiveReader(file.toPath).localizeEntries(conf, tempDir, entryFilter)

// Element type is `Object`, not `InternalRow`: a batch scan yields `ColumnarBatch`, so
// per-element casts would fail; the whole iterator is cast once at the end.
val rows = new Iterator[Object] with Closeable {
private var current: Iterator[Object] = Iterator.empty
private var currentFile: File = _
private var done = false

private def releaseCurrent(): Unit = {
current match {
case c: Closeable => try c.close() catch { case NonFatal(_) => }
case _ =>
}
current = Iterator.empty
if (currentFile != null) {
currentFile.delete()
currentFile = null
}
}

// Advance on `hasNext`, not in `next`, so a reader reusing a mutable batch is not probed for
// the next entry before the current batch is consumed.
private def advance(): Unit = {
while (!done && !current.hasNext) {
releaseCurrent()
if (entries.hasNext) {
val (_, entryFile) = entries.next()
currentFile = entryFile
current = readOne(file.copy(
filePath = SparkPath.fromUri(entryFile.toURI),
start = 0L,
length = entryFile.length(),
fileSize = entryFile.length(),
modificationTime = file.modificationTime)).asInstanceOf[Iterator[Object]]
} else {
done = true
}
}
}

override def hasNext: Boolean = {
advance()
!done && current.hasNext
}

override def next(): Object = {
if (!hasNext) throw new NoSuchElementException
current.next()
}

override def close(): Unit = {
done = true
releaseCurrent()
entries match {
case c: Closeable => try c.close() catch { case NonFatal(_) => }
case _ =>
}
}
}

// Only delete the temp dir here; do NOT close `rows`. `rows` is Closeable and becomes
// FileScanRDD's `currentIterator`, which its own (early-registered) task-completion listener
// closes -- after downstream exec nodes' listeners, per reverse-order execution. Closing the
// per-entry reader from this listener (registered during iteration, so it would run first)
// would free the vectorized reader's off-heap column vectors while downstream operators may
// still reference them, re-introducing the SPARK-37089 use-after-free. FileScanRDD's close of
// `rows` still releases the current reader and the archive stream, so nothing leaks.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit] { _ =>
Utils.deleteRecursively(tempDir)
})
rows.asInstanceOf[Iterator[InternalRow]]
}
}

/**
Expand Down
Loading