Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ class UnivocityGenerator(
(getter, ordinal) =>
timestampNTZFormatter.format(DateTimeUtils.microsToLocalDateTime(getter.getLong(ordinal)))

case t: TimestampNTZNanosType =>
(getter, ordinal) =>
timestampNTZFormatter.formatWithoutTimeZoneNanos(
getter.getTimestampNTZNanos(ordinal), t.precision)

case t: TimestampLTZNanosType =>
(getter, ordinal) =>
timestampFormatter.formatNanos(getter.getTimestampLTZNanos(ordinal), t.precision)

case _: TimeType => (getter, ordinal) => timeFormatter.format(getter.getLong(ordinal))

case YearMonthIntervalType(start, end) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ class UnivocityParser(
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
}

case t: TimestampNTZNanosType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
timestampNTZFormatter.parseWithoutTimeZoneNanos(datum, t.precision, false)
}

case t: TimestampLTZNanosType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
timestampFormatter.parseNanos(datum, t.precision)
}

case _: TimeType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
timeFormatter.parse(datum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister {

case _: GeometryType | _: GeographyType => false

// Nanosecond-capable timestamps are not yet supported by this datasource.
case _: AnyTimestampNanoType => false

case _: AtomicType => true

case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuild
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{AnyTimestampNanoType, AtomicType, DataType, GeographyType, GeometryType, StructType, UserDefinedType}
import org.apache.spark.sql.types.{AtomicType, DataType, GeographyType, GeometryType, StructType, UserDefinedType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class CSVTable(
Expand Down Expand Up @@ -63,9 +63,6 @@ case class CSVTable(
override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: GeometryType | _: GeographyType => false

// Nanosecond-capable timestamps are not yet supported by this datasource.
case _: AnyTimestampNanoType => false

case _: AtomicType => true

case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.sql

import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.time.{Duration, LocalDateTime, Period}
import java.time.{Duration, LocalDateTime, Period, ZoneOffset}
import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.{SparkException, SparkRuntimeException,
SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils
import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils.foreachNanosPrecision
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.functions._
Expand All @@ -50,8 +51,12 @@ class CsvFunctionsSuite extends SharedSparkSession {

test("SPARK-57164: from_csv with a nanos timestamp DDL schema string") {
val df = Seq("2020-01-01T00:00:00.123456789").toDF("value")
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
// Fix the session timezone so the TIMESTAMP_LTZ expected value is deterministic.
withSQLConf(
SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
foreachNanosPrecision { p =>
val nano = TimestampNanosTestUtils.nanoOfSecTruncator(p)(123456789)
Seq(
s"TIMESTAMP_NTZ($p)" -> TimestampNTZNanosType(p),
s"TIMESTAMP_LTZ($p)" -> TimestampLTZNanosType(p),
Expand All @@ -62,12 +67,14 @@ class CsvFunctionsSuite extends SharedSparkSession {
from_csv($"value", lit(s"c $spelling"), Map.empty[String, String].asJava).as("v"))
// The schema string resolves to the nanos type ...
assert(parsed.schema("v").dataType.asInstanceOf[StructType]("c").dataType === expected)
// ... but the CSV datasource does not support nanosecond timestamps yet, so the
// value converter rejects it at execution.
checkError(
exception = intercept[SparkUnsupportedOperationException](parsed.collect()),
condition = "UNSUPPORTED_DATATYPE",
parameters = Map("typeName" -> toSQLType(expected)))
// ... and the CSV datasource correctly parses the nanosecond timestamp, truncating
// sub-precision digits toward zero.
val expectedValue = expected match {
case _: TimestampNTZNanosType => LocalDateTime.of(2020, 1, 1, 0, 0, 0, nano)
case _: TimestampLTZNanosType =>
LocalDateTime.of(2020, 1, 1, 0, 0, 0, nano).toInstant(ZoneOffset.UTC)
}
checkAnswer(parsed, Row(Row(expectedValue)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ class FileBasedDataSourceSuite extends SharedSparkSession

test("SPARK-57166: nanosecond timestamp types are not supported in selected file data sources") {
// Parquet and ORC support nanosecond-capable timestamps, while these formats still reject them.
val unsupportedDataSources = Seq("json", "csv", "xml")
val unsupportedDataSources = Seq("json", "xml")
val nanosTypes = Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(9))
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
// Test both v1 and v2 data sources.
Expand Down Expand Up @@ -1431,6 +1431,48 @@ class FileBasedDataSourceSuite extends SharedSparkSession
}
}

test("SPARK-57457: CSV supports nanosecond timestamp types in v1 and v2") {
withSQLConf(SQLConf.TIMESTAMP_NANOS_TYPES_ENABLED.key -> "true") {
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) "csv" else ""
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
foreachNanosPrecision { precision =>
// CSV is text-based: the format string must carry enough fractional-second digits to
// represent the full precision. Use exactly `precision` S-characters so the emitted
// string is compact and unambiguously round-trips at the given precision.
val fracPat = "S" * precision
Seq(TimestampNTZNanosType(precision), TimestampLTZNanosType(precision)).foreach {
nanosType =>
withTempDir { dir =>
val wallClock = LocalDateTime.of(1970, 1, 1, 0, 20, 34, 567890123)
val value: Any = nanosType match {
case _: TimestampNTZNanosType => wallClock
case _: TimestampLTZNanosType => wallClock.toInstant(ZoneOffset.UTC)
}
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(value))),
new StructType().add("ts", nanosType))
val path = new File(dir, s"csv_nanos_${nanosType.typeName}").getCanonicalPath
val (fmtKey, fmtVal) = nanosType match {
case _: TimestampNTZNanosType =>
("timestampNTZFormat", s"yyyy-MM-dd'T'HH:mm:ss.$fracPat")
case _: TimestampLTZNanosType =>
("timestampFormat", s"yyyy-MM-dd'T'HH:mm:ss.${fracPat}XXX")
}
df.write.format("csv").option(fmtKey, fmtVal).mode("overwrite").save(path)
val readBack = spark.read
.schema(new StructType().add("ts", nanosType))
.option(fmtKey, fmtVal)
.format("csv").load(path)
checkAnswer(readBack, df)
}
}
}
}
}
}
}

// Asserts the ignoredPathSegmentRegex contract for `format`: the default regex hides the
// '_'-prefixed file; a never-matching per-read option or session conf each surface it; the
// option overrides the conf.
Expand Down