Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 152 additions & 36 deletions connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkRuntimeException, SparkThrowable, SparkUpgradeException}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkArithmeticException, SparkConf, SparkException, SparkRuntimeException, SparkThrowable, SparkUpgradeException}
import org.apache.spark.TestUtils.assertExceptionMsg
import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.avro.AvroCompressionCodec._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC}
Expand All @@ -52,7 +52,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.v2.avro.AvroScan
import org.apache.spark.unsafe.types.TimestampNanosVal
import org.apache.spark.util.Utils

abstract class AvroSuite
Expand Down Expand Up @@ -3192,41 +3191,158 @@ abstract class AvroSuite
}
}

test("SPARK-57166: nanosecond timestamp types are not supported in Avro") {
val nanosTypes = Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9))
test("SPARK-57459: nanosecond timestamp types round-trip through Avro (v1 and v2)") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
nanosTypes.foreach { nanosType =>
val expectedType = s""""${nanosType.sql}""""
withTempDir { dir =>
// Write path: a nanos-typed column cannot be written. The nanos literal is built
// directly from its internal value to avoid relying on cast/parser support.
val nanosLiteral = Literal.create(new TimestampNanosVal(0L, 0.toShort), nanosType)
val df = spark.range(1).select(Column(nanosLiteral).as("ts"))
val writeDir = new File(dir, "write").getCanonicalPath
checkError(
exception = intercept[AnalysisException] {
df.write.format("avro").mode("overwrite").save(writeDir)
},
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
parameters = Map(
"columnName" -> "`ts`",
"columnType" -> expectedType,
"format" -> "Avro"))

// Read path: a user-specified nanos schema is rejected. Write a benign file first
// so schema validation (not file listing) is what fails.
val readDir = new File(dir, "read").getCanonicalPath
Seq("a").toDF("ts").write.format("avro").mode("overwrite").save(readDir)
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) "avro" else ""
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
Seq(7, 8, 9).foreach { precision =>
Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach {
nanosType =>
withTempDir { dir =>
// Build the row from an external java.time value; the column schema carries the
// precision and truncates the sub-microsecond digits, matching the ORC suites.
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123)
val value: Any = nanosType match {
case _: TimestampNTZNanosType => wallClock
case _: TimestampLTZNanosType => wallClock.toInstant(java.time.ZoneOffset.UTC)
}
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(value), Row(null))),
new StructType().add("ts", nanosType))
val path = new File(dir, s"avro_nanos_${nanosType.typeName}").getCanonicalPath
df.write.format("avro").mode("overwrite").save(path)
// The inferred schema preserves the declared precision via the catalyst prop.
val inferred = spark.read.format("avro").load(path)
assert(inferred.schema("ts").dataType == nanosType)
val readBack = spark.read.schema(new StructType().add("ts", nanosType))
.format("avro").load(path)
checkAnswer(readBack, df)
}
}
}
}
}
}
}

test("SPARK-57459: nanosecond timestamps are written as unit-correct epoch-nanos") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
// LocalDateTime at the epoch plus 567890123 ns, truncated to each precision.
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 0, 0, 567890123)
val expectedNanos = Map(7 -> 567890100L, 8 -> 567890120L, 9 -> 567890123L)
Seq(
(false, "timestamp-nanos"),
(true, "local-timestamp-nanos")).foreach { case (isNtz, logicalName) =>
Seq(7, 8, 9).foreach { p =>
withTempPath { dir =>
val nanosType: DataType =
if (isNtz) TimestampNTZNanosType(p) else TimestampLTZNanosType(p)
val value: Any =
if (isNtz) wallClock else wallClock.toInstant(java.time.ZoneOffset.UTC)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(value)), numSlices = 1),
new StructType().add("t", nanosType))
df.write.format("avro").save(dir.toString)

val avroFile = dir.listFiles()
.filter(f => f.isFile && f.getName.endsWith("avro"))
.head
val reader = new DataFileReader[GenericRecord](
avroFile, new GenericDatumReader[GenericRecord]())
try {
val fieldSchema = reader.getSchema.getField("t").schema()
val tsSchema = if (fieldSchema.getType == Type.UNION) {
fieldSchema.getTypes.asScala.find(_.getType == Type.LONG).get
} else {
fieldSchema
}
assert(tsSchema.getLogicalType.getName == logicalName,
s"$nanosType should be written with the $logicalName logical type")
assert(reader.hasNext)
val stored = reader.next().get("t").asInstanceOf[Long]
assert(stored == expectedNanos(p),
s"$nanosType should store epoch-nanos ${expectedNanos(p)}, but was $stored")
} finally {
reader.close()
}
}
}
}
}
}

test("SPARK-57459: nanosecond timestamps read from a plain Avro file (no catalyst prop)") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
// Build Avro files the way an external tool would: a nanos logical type on a long with no
// `spark.sql.catalyst.type` property. Spark must default to nanosecond precision and convert
// the stored epoch-nanoseconds back to its internal (epochMicros, nanosWithinMicro) form.
val epochNanos = 567890123L
Seq(
("timestamp-nanos", TimestampLTZNanosType(9),
java.time.Instant.ofEpochSecond(0L, epochNanos)),
("local-timestamp-nanos", TimestampNTZNanosType(9),
LocalDateTime.of(1970, 1, 1, 0, 0, 0, epochNanos.toInt))).foreach {
case (logicalName, expectedType, expectedValue) =>
withTempDir { dir =>
val avroSchema = new Schema.Parser().parse(
s"""
|{
| "type": "record",
| "name": "top",
| "fields": [
| {"name": "t", "type": {"type": "long", "logicalType": "$logicalName"}}
| ]
|}
""".stripMargin)
val avroFile = new File(dir, "external.avro")
val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(avroSchema, avroFile)
try {
val record = new GenericData.Record(avroSchema)
record.put("t", epochNanos)
dataFileWriter.append(record)
} finally {
dataFileWriter.close()
}

val readDf = spark.read.format("avro").load(dir.toString)
assert(readDf.schema("t").dataType == expectedType)
checkAnswer(readDf, Row(expectedValue))
}
}
}
}

test("SPARK-57459: writing an out-of-range nanosecond timestamp fails loudly") {
withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
Seq("TIMESTAMP_NTZ", "TIMESTAMP_LTZ").foreach { typeName =>
withTempPath { dir =>
// Year 9999 is far outside the signed-int64 epoch-nanos range (~2262).
val df = spark.sql(s"SELECT $typeName '9999-12-31 23:59:59.999999999' AS ts")
val e = intercept[SparkException] {
df.write.format("avro").save(dir.getCanonicalPath)
}
var cause: Throwable = e
while (cause != null && !cause.isInstanceOf[SparkArithmeticException]) {
cause = cause.getCause
}
assert(cause != null,
s"Expected a DATETIME_OVERFLOW error for $typeName, but got: ${e.getMessage}")
// NTZ renders without a zone; LTZ renders as a UTC instant with a trailing `Z`.
val renderedValue =
if (typeName == "TIMESTAMP_NTZ") "9999-12-31T23:59:59.999999999"
else "9999-12-31T23:59:59.999999999Z"
checkError(
exception = intercept[AnalysisException] {
spark.read.schema(new StructType().add("ts", nanosType))
.format("avro").load(readDir).collect()
},
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
parameters = Map(
"columnName" -> "`ts`",
"columnType" -> expectedType,
"format" -> "Avro"))
exception = cause.asInstanceOf[SparkArithmeticException],
condition = "DATETIME_OVERFLOW",
parameters = Map("operation" ->
(s"write the timestamp value $renderedValue as Avro epoch-nanoseconds " +
"(supported range: 1677-09-21T00:12:43.145224192Z to " +
"2262-04-11T23:47:16.854775807Z)")))
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test coverage is a bit narrower than ORC/Parquet, for example we're missing cases like:

  • LTZ behavior across session time-zone changes
  • min/max documented range round-trip (e.g. 0001-01-01 … 9999-12-31)
  • nested/complex types (struct/array/map)

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,36 @@ object DateTimeUtils extends SparkDateTimeUtils {
value.nanosWithinMicro.toLong)
}

/**
* Packs a [[TimestampNanosVal]] into a single int64 of epoch-nanoseconds for a `sink` that uses
* that encoding (the Parquet INT64 and Avro `timestamp-nanos` / `local-timestamp-nanos` physical
* types), translating the int64 overflow thrown by [[timestampNanosToEpochNanos]] into a
* `DATETIME_OVERFLOW` error that names the `sink`. `isNtz` selects how the offending value is
* rendered in that error (a zone-less local date-time vs. a UTC instant).
*/
def timestampNanosToEpochNanos(value: TimestampNanosVal, isNtz: Boolean, sink: String): Long = {
try {
timestampNanosToEpochNanos(value)
} catch {
case _: ArithmeticException =>
throw QueryExecutionErrors.timestampNanosEpochNanosOverflowError(value, isNtz, sink)
}
}

/**
* Unpacks a single int64 of nanoseconds since the epoch (the representation used by the Arrow
* nanosecond timestamp vectors and the Parquet / Avro INT64 epoch-nanoseconds encodings) back
* into a [[TimestampNanosVal]], truncating the sub-microsecond digits to the given `precision`
* (in [7, 9]). This is the inverse of [[timestampNanosToEpochNanos]]. `floorDiv` / `floorMod`
* keep `nanosWithinMicro` in [0, 999] for pre-epoch (negative) values too.
*/
def epochNanosToTimestampNanos(epochNanos: Long, precision: Int): TimestampNanosVal = {
val epochMicros = Math.floorDiv(epochNanos, NANOS_PER_MICROS)
val rawNanosWithinMicro = Math.floorMod(epochNanos, NANOS_PER_MICROS).toInt
val nanosWithinMicro = truncateNanosWithinMicroToPrecision(rawNanosWithinMicro, precision)
TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort)
}

/**
* Adds a full interval (months, days, microseconds) to a timestamp represented as the number of
* microseconds since 1970-01-01 00:00:00Z.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,52 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
}
}

test("SPARK-57459: epochNanosToTimestampNanos unpacks int64 epoch-nanoseconds") {
def nanos(epochMicros: Long, nanosWithinMicro: Int): TimestampNanosVal =
TimestampNanosVal.fromParts(epochMicros, nanosWithinMicro.toShort)

// At full (nanosecond) precision it is the exact inverse of timestampNanosToEpochNanos.
assert(epochNanosToTimestampNanos(0L, 9) === nanos(0L, 0))
assert(epochNanosToTimestampNanos(999L, 9) === nanos(0L, 999))
assert(epochNanosToTimestampNanos(NANOS_PER_MICROS, 9) === nanos(1L, 0))
assert(epochNanosToTimestampNanos(1234567L * NANOS_PER_MICROS + 7L, 9) === nanos(1234567L, 7))
// Pre-epoch values use floor semantics, keeping nanosWithinMicro in [0, 999].
assert(epochNanosToTimestampNanos(-1L, 9) === nanos(-1L, 999))
assert(epochNanosToTimestampNanos(-NANOS_PER_MICROS, 9) === nanos(-1L, 0))

// Lower precisions truncate the sub-microsecond digits.
assert(epochNanosToTimestampNanos(123456789L, 9) === nanos(123456L, 789))
assert(epochNanosToTimestampNanos(123456789L, 8) === nanos(123456L, 780))
assert(epochNanosToTimestampNanos(123456789L, 7) === nanos(123456L, 700))

// Truncation operates on the floored nanosWithinMicro, so it composes with floor semantics for
// pre-epoch values too (-123456211 -> floor (-123457, 789) -> truncate the 789).
assert(epochNanosToTimestampNanos(-123456211L, 9) === nanos(-123457L, 789))
assert(epochNanosToTimestampNanos(-123456211L, 8) === nanos(-123457L, 780))
assert(epochNanosToTimestampNanos(-123456211L, 7) === nanos(-123457L, 700))

// The int64 extremes decode without overflow: floor keeps nanosWithinMicro in [0, 999].
assert(epochNanosToTimestampNanos(Long.MaxValue, 9) === nanos(9223372036854775L, 807))
assert(epochNanosToTimestampNanos(Long.MinValue, 9) === nanos(-9223372036854776L, 192))

// Round-trips with timestampNanosToEpochNanos at full precision. Long.MinValue is excluded: its
// decode (-9223372036854776, 192) re-encodes through an intermediate epochMicros * 1000 that
// overflows Long, an existing limitation of the multiplyExact-based pack path.
Seq(
0L, 999L, 1234567L * NANOS_PER_MICROS + 7L, -1234567L * NANOS_PER_MICROS + 13L,
Long.MaxValue).foreach { epochNanos =>
assert(timestampNanosToEpochNanos(epochNanosToTimestampNanos(epochNanos, 9)) === epochNanos)
}

// Precision outside [7, 9] is an internal-only contract violation and fails loudly.
checkError(
exception = intercept[SparkException] {
epochNanosToTimestampNanos(123456789L, 6)
},
condition = "INTERNAL_ERROR",
parameters = Map("message" -> "Fractional second precision 6 is out of range [7, 9]."))
}

test("SPARK-34903: subtract timestamps") {
DateTimeTestUtils.outstandingZoneIds.foreach { zid =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,28 @@ private[sql] class AvroDeserializer(
s"Avro logical type $other cannot be converted to SQL type ${TimeType().sql}.")
}

case (LONG, t: TimestampLTZNanosType) => avroType.getLogicalType match {
// The timestamp-nanos logical type stores epoch-nanoseconds (Long), while the value is
// represented internally as (epochMicros, nanosWithinMicro). Floor semantics keep
// nanosWithinMicro in [0, 999] for pre-epoch values. Nanos timestamps are always proleptic
// Gregorian, so they are exempt from datetime rebasing.
case _: LogicalTypes.TimestampNanos => (updater, ordinal, value) =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to use fully qualified names in these cases? Same for AvroSerializer?

updater.set(ordinal,
DateTimeUtils.epochNanosToTimestampNanos(value.asInstanceOf[Long], t.precision))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type " +
s"${TimestampLTZNanosType().sql}.")
}

case (LONG, t: TimestampNTZNanosType) => avroType.getLogicalType match {
case _: LogicalTypes.LocalTimestampNanos => (updater, ordinal, value) =>
updater.set(ordinal,
DateTimeUtils.epochNanosToTimestampNanos(value.asInstanceOf[Long], t.precision))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type " +
s"${TimestampNTZNanosType().sql}.")
}

// Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,26 @@ private[sql] class AvroSerializer(
s"SQL type ${TimeType().sql} cannot be converted to Avro logical type $other")
}

case (_: TimestampLTZNanosType, LONG) => avroType.getLogicalType match {
// Nanosecond-precision timestamps are stored as epoch-nanoseconds (Long). They are always
// proleptic Gregorian, so they are exempt from datetime rebasing.
case _: LogicalTypes.TimestampNanos => (getter, ordinal) =>
DateTimeUtils.timestampNanosToEpochNanos(
getter.getTimestampLTZNanos(ordinal), isNtz = false, sink = "Avro")
case other => throw new IncompatibleSchemaException(errorPrefix +
s"SQL type ${TimestampLTZNanosType().sql} cannot be converted to " +
s"Avro logical type $other")
}

case (_: TimestampNTZNanosType, LONG) => avroType.getLogicalType match {
case _: LogicalTypes.LocalTimestampNanos => (getter, ordinal) =>
DateTimeUtils.timestampNanosToEpochNanos(
getter.getTimestampNTZNanos(ordinal), isNtz = true, sink = "Avro")
case other => throw new IncompatibleSchemaException(errorPrefix +
s"SQL type ${TimestampNTZNanosType().sql} cannot be converted to " +
s"Avro logical type $other")
}

case (ArrayType(et, containsNull), ARRAY) =>
val elementConverter = newConverter(
et, resolveNullableType(avroType.getElementType, containsNull),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ private[sql] object AvroUtils extends Logging {

case _: GeometryType | _: GeographyType => false

// Nanosecond-capable timestamps are not yet supported by this datasource.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also update docs/sql-data-sources-avro.md accordingly, or will you do that in a followup PR?

case _: AnyTimestampNanoType => false

case _: AtomicType => true

case st: StructType => st.forall { f => supportsDataType(f.dataType) }
Expand Down
Loading