Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

/**
* Reads of Avro files packed in zip archives (`.zip`): the shared archive tests from
* [[ArchiveReadSuiteBase]] plus the Avro-specific ones from [[AvroArchiveReadBase]], run over zip
* containers via [[ZipArchiveReadBase]].
*/
class AvroZipArchiveReadSuite
extends ArchiveReadSuiteBase
with AvroArchiveReadBase
with ZipArchiveReadBase
Original file line number Diff line number Diff line change
Expand Up @@ -2727,11 +2727,8 @@ object SQLConf {
.createWithDefaultString("128MB") // parquet.block.size

val ARCHIVE_FORMAT_READER_ENABLED = buildConf("spark.sql.files.archive.reader.enabled")
.doc("When true, a supported data source can read tar archives (.tar, .tar.gz, .tgz): " +
"each archive is read as a single split and its entries are streamed through that data " +
"source's parser (never unpacked to disk), as if the entries were separate files, both " +
"during scan and schema inference. The CSV, JSON, and text data sources support " +
"reading archives.")
.doc("When true, supported file-based data sources can read archive files, reading each " +
"archive's entries as if they were separate files, during both scan and schema inference.")
.version("5.0.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import java.util.zip.GZIPInputStream

import scala.util.control.NonFatal

import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream}
import org.apache.commons.compress.archivers.{ArchiveEntry, ArchiveInputStream}
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
import org.apache.commons.io.ByteOrderMark
import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream}
import org.apache.hadoop.conf.Configuration
Expand All @@ -48,12 +50,49 @@ import org.apache.spark.util.HadoopFSUtils
* `parseEntry` that turns one entry stream into rows (or tokens). Formats that need random access
* within a file (e.g. Parquet/ORC footers) cannot use this streaming path.
*
* A concrete subclass implements [[readEntries]] for a specific archive format. Obtain the reader
* for a path via `ArchiveReader(path)`, which selects the implementation by file extension; new
* archive formats are added by writing another subclass rather than modifying existing ones.
* The entry-streaming engine ([[readEntries]]) is shared across archive formats -- tar and zip
* differ only in how the container stream is opened, and both use commons-compress
* `ArchiveInputStream`. A concrete subclass implements only [[openArchiveStream]]. Obtain the
* reader for a path via `ArchiveReader(path)`, which selects the implementation by file
* extension; new archive formats are added by writing another subclass rather than modifying the
* shared engine.
*/
abstract class ArchiveReader(path: Path) {

/**
* Opens the archive at `path` as a commons-compress stream, transparently handling its
* compression. The shared [[readEntries]] engine steps through entries via `getNextEntry`; a
* subclass only chooses the container type (e.g. tar vs zip).
*/
protected def openArchiveStream(conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry]

/**
* Whether an entry is not a real data file and must be skipped: a directory, or a name Spark's
* own file listing would filter out. Applying [[HadoopFSUtils.shouldFilterOutPathName]] (the
* `InMemoryFileIndex` filter) with the same effective `ignoredPathSegmentRegex` to every path
* component keeps archive reads in parity with reading the same entries as loose files,
* including when the user supplies a custom `ignoredPathSegmentRegex` option: under the default
* filter, `.`-prefixed sidecars (macOS `._x`, `.DS_Store`), `_`-prefixed markers (`_SUCCESS`,
* `_committed_*`), and anything under a `.`/`_`-prefixed directory (e.g. a leftover
* `_temporary/` from a failed write) are skipped, while data files are kept. The per-component
* check matters because `InMemoryFileIndex` never recurses into such directories, so a
* basename-only filter would read `_temporary/part-0.csv` that a loose-file scan drops.
*/
private def shouldSkipEntry(entry: ArchiveEntry, ignoredPathSegmentRegex: Pattern): Boolean = {
if (entry.isDirectory) return true
entry.getName.split("/").exists(c =>
c.nonEmpty && HadoopFSUtils.shouldFilterOutPathName(c, ignoredPathSegmentRegex))
}

/**
* Wraps the shared archive stream as a view over exactly the current entry's bytes
* (`ArchiveInputStream.read` returns -1 at the entry boundary). [[CloseShieldInputStream]]
* ignores `close()`, so a parser closing its input does not close the underlying archive; any
* unread remainder of an entry is skipped by `getNextEntry()` when advancing.
*/
private def entryStream(archive: ArchiveInputStream[_ <: ArchiveEntry]): InputStream =
CloseShieldInputStream.wrap(archive)

/**
* Streams the archive entry by entry, applying `parseEntry` to each non-skipped entry's
* `(name, stream)` and concatenating the results into a single iterator. The next entry is opened
Expand All @@ -67,25 +106,109 @@ abstract class ArchiveReader(path: Path) {
def readEntries[T](
conf: Configuration,
ignoredPathSegmentRegex: Pattern = HadoopFSUtils.defaultIgnoredPathSegmentRegexPattern)(
parseEntry: (String, InputStream) => Iterator[T]): Iterator[T]
parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = {
val archive = openArchiveStream(conf)
var closed = false

def cleanup(): Unit = {
if (!closed) {
closed = true
try archive.close() catch { case NonFatal(_) => }
}
}

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup()))

val entries = new Iterator[T] with Closeable {
private var currentIter: Iterator[T] = Iterator.empty
private var done = false

// Move to the next entry whose iterator has elements (releasing each exhausted entry's
// reader and skipping any unread bytes), or mark the stream done once entries run out.
// Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in
// `next` is essential for parsers that reuse a single mutable row and look ahead on
// `hasNext`: probing the current entry right after returning a row would overwrite that row's
// contents before the caller has copied it.
private def advance(): Unit = {
while (!done && !currentIter.hasNext) {
currentIter match {
case c: Closeable => try c.close() catch { case NonFatal(_) => }
case _ =>
}
var entry = archive.getNextEntry

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advance loop calls parseEntry on every non-skipped entry without consulting ArchiveInputStream.canReadEntryData(entry). For ZipArchiveInputStream that returns false for entries it can't stream — a STORED entry with a trailing data descriptor, an unsupported compression method, or encryption — i.e. exactly the "a few unusual zips … are not streamable this way" case the new Scaladoc documents.

Question: on such an entry, does the current path fail loudly with a clear error, or can it silently yield a truncated/garbled entry? If the latter is possible, consider guarding here:

if (entry != null && !archive.canReadEntryData(entry)) {
  throw <clear error naming the unsupported zip feature>
}

so the documented limitation surfaces deterministically rather than depending on ZipArchiveInputStream's default read behavior.

while (entry != null && shouldSkipEntry(entry, ignoredPathSegmentRegex)) {
entry = archive.getNextEntry
}
if (entry == null) {
done = true
cleanup()
} else {
currentIter = parseEntry(entry.getName, entryStream(archive))
}
}
}

override def hasNext: Boolean = {
advance()
!done && currentIter.hasNext
}

override def next(): T = {
if (!hasNext) throw new NoSuchElementException
currentIter.next()
}

override def close(): Unit = {
done = true
currentIter = Iterator.empty
cleanup()
}
}

// Open the first entry eagerly so the construction cost (and any failure) surfaces here rather
// than at the first `hasNext`. A corrupt archive throws before the caller ever holds the
// iterator, leaving it nothing to close: executors release the stream through the task-
// completion listener, but driver-side callers (e.g. Avro's header-only schema inference) have
// no task, so close it here before propagating.
try {
entries.hasNext
} catch {
case NonFatal(e) =>
cleanup()
throw e
}
entries
}
}

object ArchiveReader {

/**
* Whether `path` names an archive this reader can stream. Dispatched purely on the file
* extension -- `.tar`, `.tar.gz`, or `.tgz` -- since the bytes are not inspected here.
* extension -- `.tar`, `.tar.gz`, `.tgz`, or `.zip` -- since the bytes are not inspected here.
*/
def isArchivePath(path: Path): Boolean = {
val name = path.getName.toLowerCase(Locale.ROOT)
name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz")
name.endsWith(".tar") || name.endsWith(".tar.gz") || name.endsWith(".tgz") ||
name.endsWith(".zip")
}

/**
* Returns the [[ArchiveReader]] implementation for `path`, selected by its file extension. Only
* paths for which [[isArchivePath]] is true are supported; new archive formats add a case here.
*/
def apply(path: Path): ArchiveReader = new TarArchiveReader(path)
def apply(path: Path): ArchiveReader = {
val name = path.getName.toLowerCase(Locale.ROOT)
name match {
case n if n.endsWith(".tar") || n.endsWith(".tar.gz") || n.endsWith(".tgz") =>
new TarArchiveReader(path)
case n if n.endsWith(".zip") =>
new ZipArchiveReader(path)
case _ =>
throw new IllegalArgumentException(
s"$path is not a supported archive (expected .tar, .tar.gz, .tgz, or .zip)")
}
}

/**
* Splits one already-decompressed archive entry's bytes into lines. The reusable, format-agnostic
Expand Down Expand Up @@ -149,26 +272,8 @@ class TarArchiveReader(path: Path) extends ArchiveReader(path) {
private def needsExplicitGunzip: Boolean =
path.getName.toLowerCase(Locale.ROOT).endsWith(".tgz")

/**
* Whether an entry is not a real data file and must be skipped: a directory, or a name Spark's
* own file listing would filter out. Applying [[HadoopFSUtils.shouldFilterOutPathName]] (the
* `InMemoryFileIndex` filter) with the same effective `ignoredPathSegmentRegex` to every path
* component keeps archive reads in parity with reading the same entries as loose files,
* including when the user supplies a custom `ignoredPathSegmentRegex` option: under the default
* filter, `.`-prefixed sidecars (macOS `._x`, `.DS_Store`), `_`-prefixed markers (`_SUCCESS`,
* `_committed_*`), and anything under a `.`/`_`-prefixed directory (e.g. a leftover
* `_temporary/` from a failed write) are skipped, while data files are kept. The per-component
* check matters because `InMemoryFileIndex` never recurses into such directories, so a
* basename-only filter would read `_temporary/part-0.csv` that a loose-file scan drops.
*/
private def shouldSkipEntry(entry: TarArchiveEntry, ignoredPathSegmentRegex: Pattern): Boolean = {
if (entry.isDirectory) return true
entry.getName.split("/").exists(c =>
c.nonEmpty && HadoopFSUtils.shouldFilterOutPathName(c, ignoredPathSegmentRegex))
}

/** Opens the archive as a tar stream, transparently decompressing `.tar.gz` / `.tgz`. */
private def openTarStream(conf: Configuration): TarArchiveInputStream = {
override protected def openArchiveStream(
conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] = {
val base = CodecStreams.createInputStreamWithCloseResource(conf, path)
try {
// GZIPInputStream reads the gzip header in its constructor, so a corrupt archive can throw
Expand All @@ -181,90 +286,19 @@ class TarArchiveReader(path: Path) extends ArchiveReader(path) {
throw e
}
}
}

/**
* Wraps the shared tar stream as a view over exactly the current entry's bytes
* (`TarArchiveInputStream.read` returns -1 at the entry boundary). [[CloseShieldInputStream]]
* ignores `close()`, so a parser closing its input does not close the underlying archive; any
* unread remainder of an entry is skipped by `getNextEntry()` when advancing.
*/
private def entryStream(tar: TarArchiveInputStream): InputStream =
CloseShieldInputStream.wrap(tar)

override def readEntries[T](
conf: Configuration,
ignoredPathSegmentRegex: Pattern)(
parseEntry: (String, InputStream) => Iterator[T]): Iterator[T] = {
val tar = openTarStream(conf)
var closed = false

def cleanup(): Unit = {
if (!closed) {
closed = true
try tar.close() catch { case NonFatal(_) => }
}
}

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => cleanup()))

val entries = new Iterator[T] with Closeable {
private var currentIter: Iterator[T] = Iterator.empty
private var done = false

// Move to the next entry whose iterator has elements (releasing each exhausted entry's
// reader and skipping any unread bytes), or mark the stream done once entries run out.
// Advancing here -- driven by `hasNext` -- rather than eagerly after producing a row in
// `next` is essential for parsers that reuse a single mutable row and look ahead on
// `hasNext`: probing the current entry right after returning a row would overwrite that row's
// contents before the caller has copied it.
private def advance(): Unit = {
while (!done && !currentIter.hasNext) {
currentIter match {
case c: Closeable => try c.close() catch { case NonFatal(_) => }
case _ =>
}
var entry = tar.getNextEntry
while (entry != null && shouldSkipEntry(entry, ignoredPathSegmentRegex)) {
entry = tar.getNextEntry
}
if (entry == null) {
done = true
cleanup()
} else {
currentIter = parseEntry(entry.getName, entryStream(tar))
}
}
}

override def hasNext: Boolean = {
advance()
!done && currentIter.hasNext
}

override def next(): T = {
if (!hasNext) throw new NoSuchElementException
currentIter.next()
}

override def close(): Unit = {
done = true
currentIter = Iterator.empty
cleanup()
}
}
/**
* [[ArchiveReader]] for zip archives (`.zip`). Each entry is individually compressed inside the
* container (the container itself is not gzip-wrapped), so `ZipArchiveInputStream` decompresses
* entries as they are streamed and no Hadoop codec layer is applied. The stream reads local file
* headers sequentially rather than the central directory, matching the tar reader's pure-streaming
* model: a few unusual zips (e.g. a stored entry whose size is recorded only in a trailing data
* descriptor) are not streamable this way.
*/
class ZipArchiveReader(path: Path) extends ArchiveReader(path) {

// Open the first entry eagerly so the construction cost (and any failure) surfaces here rather
// than at the first `hasNext`. A corrupt archive throws before the caller ever holds the
// iterator, leaving it nothing to close: executors release the stream through the task-
// completion listener, but driver-side callers (e.g. Avro's header-only schema inference) have
// no task, so close it here before propagating.
try {
entries.hasNext
} catch {
case NonFatal(e) =>
cleanup()
throw e
}
entries
}
override protected def openArchiveStream(
conf: Configuration): ArchiveInputStream[_ <: ArchiveEntry] =
new ZipArchiveInputStream(CodecStreams.createInputStreamWithCloseResource(conf, path))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "not streamable" limitation this ZipArchiveReader documents has no test pinning its behavior. ZipArchiveTestUtils.writeArchive uses ZipArchiveOutputStream (good — an independent producer, distinct from the reader) and there's a writeCorruptArchive negative case, but nothing exercises a valid-but-non-streamable zip. Consider adding a fixture (e.g. a STORED entry written with a data descriptor) that asserts the chosen behavior, so a future change can't silently turn the canReadEntryData gap above into corrupt rows.

}
Loading