diff --git a/python/hatch_build.py b/python/hatch_build.py index d375d65b1..e8cc90bce 100644 --- a/python/hatch_build.py +++ b/python/hatch_build.py @@ -56,10 +56,12 @@ class BuildHook(BuildHookInterface): # C++ source files in ArrowIterator directory CPP_SOURCES = [ "ArrayConverter.cpp", + "ArrowTableConverter.cpp", "BinaryConverter.cpp", "BooleanConverter.cpp", "CArrowIterator.cpp", "CArrowStreamIterator.cpp", + "CArrowStreamTableIterator.cpp", "ConverterUtil.cpp", "DateConverter.cpp", "DecFloatConverter.cpp", diff --git a/python/src/snowflake/connector/_internal/arrow_stream_iterator.pyi b/python/src/snowflake/connector/_internal/arrow_stream_iterator.pyi index 753e82760..a657e63cc 100644 --- a/python/src/snowflake/connector/_internal/arrow_stream_iterator.pyi +++ b/python/src/snowflake/connector/_internal/arrow_stream_iterator.pyi @@ -24,3 +24,17 @@ class ArrowStreamIterator(Iterator[Any]): ) -> None: ... def __iter__(self) -> ArrowStreamIterator: ... def __next__(self) -> Any: ... + +class ArrowStreamTableIterator(Iterator[Any]): + """Iterator that yields one pyarrow RecordBatch per batch with Snowflake type conversions applied.""" + + def __init__( + self, + stream_ptr: int, + arrow_context: Any, + number_to_decimal: bool = False, + force_microsecond_precision: bool = False, + ) -> None: ... + def __iter__(self) -> ArrowStreamTableIterator: ... + def __next__(self) -> Any: ... + def get_converted_schema(self) -> Any: ... diff --git a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/ArrowTableConverter.cpp b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/ArrowTableConverter.cpp new file mode 100644 index 000000000..d7bf912b4 --- /dev/null +++ b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/ArrowTableConverter.cpp @@ -0,0 +1,870 @@ +#include "ArrowTableConverter.hpp" + +#include +#include + +#include "Python/Common.hpp" +#include "SnowflakeType.hpp" +#include "Util/time.hpp" + +namespace sf { + +Logger* ArrowTableConverter::logger = new Logger("snowflake.connector.ArrowTableConverter"); + +ArrowTableConverter::ArrowTableConverter(bool number_to_decimal, bool force_microsecond_precision, const std::string& timezone) + : m_convert_number_to_decimal(number_to_decimal), + m_force_microsecond_precision(force_microsecond_precision), + m_timezone(timezone) {} + +// --------------------------------------------------------------------------- +// convertIfNeeded – top-level dispatch on Snowflake logical type +// --------------------------------------------------------------------------- + +void ArrowTableConverter::convertIfNeeded(ArrowSchema* columnSchema, + ArrowArrayView* columnArray) { + ArrowSchemaView columnSchemaView; + ArrowError error; + int returnCode; + + returnCode = ArrowSchemaViewInit(&columnSchemaView, columnSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing " + "ArrowSchemaView: %s, error code: %d", + ArrowErrorMessage(&error), returnCode); + + ArrowStringView snowflakeLogicalType; + const char* metadata = columnSchema->metadata; + returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("logicalType"), + &snowflakeLogicalType); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'logicalType' " + "from Arrow metadata, error code: %d", + returnCode); + + SnowflakeType::Type st = SnowflakeType::snowflakeTypeFromString( + std::string(snowflakeLogicalType.data, snowflakeLogicalType.size_bytes)); + + switch (st) { + case SnowflakeType::Type::FIXED: { + int scale = 0; + ArrowStringView scaleString = ArrowCharView(nullptr); + if (metadata != nullptr) { + returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), + &scaleString); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'scale' " + "from Arrow metadata, error code: %d", + returnCode); + scale = std::stoi( + std::string(scaleString.data, scaleString.size_bytes)); + } + if (scale > 0 && + columnSchemaView.type != NANOARROW_TYPE_DECIMAL128) { + convertScaledFixedNumberColumn_nanoarrow(&columnSchemaView, + columnArray, scale); + } + break; + } + + case SnowflakeType::Type::ANY: + case SnowflakeType::Type::BINARY: + case SnowflakeType::Type::BOOLEAN: + case SnowflakeType::Type::CHAR: + case SnowflakeType::Type::DATE: + case SnowflakeType::Type::REAL: + case SnowflakeType::Type::TEXT: + case SnowflakeType::Type::INTERVAL_YEAR_MONTH: + case SnowflakeType::Type::VARIANT: + case SnowflakeType::Type::VECTOR: + break; + + case SnowflakeType::Type::ARRAY: { + switch (columnSchemaView.type) { + case NANOARROW_TYPE_STRING: + break; + case NANOARROW_TYPE_LIST: { + if (columnSchemaView.schema->n_children != 1) { + PyErr_SetString( + PyExc_Exception, + Logger::formatString( + "[Snowflake Exception] invalid arrow schema for array " + "items: expected 1 child, got %d", + columnSchemaView.schema->n_children).c_str()); + break; + } + convertIfNeeded(columnSchemaView.schema->children[0], + columnArray->children[0]); + break; + } + default: + PyErr_SetString( + PyExc_Exception, + Logger::formatString( + "[Snowflake Exception] unknown arrow type(%s) " + "for ARRAY data in %s", + NANOARROW_TYPE_ENUM_STRING[columnSchemaView.type], + columnSchemaView.schema->name).c_str()); + } + break; + } + + case SnowflakeType::Type::MAP: { + if (columnSchemaView.schema->n_children != 1) { + PyErr_SetString( + PyExc_Exception, + Logger::formatString( + "[Snowflake Exception] invalid arrow schema for map " + "entries: expected 1 child, got %d", + columnSchemaView.schema->n_children).c_str()); + break; + } + ArrowSchema* entries = columnSchemaView.schema->children[0]; + if (entries->n_children != 2) { + PyErr_SetString( + PyExc_Exception, + Logger::formatString( + "[Snowflake Exception] invalid arrow schema for map " + "key/value: expected 2 entries, got %d", + entries->n_children).c_str()); + break; + } + convertIfNeeded(entries->children[0], + columnArray->children[0]->children[0]); + convertIfNeeded(entries->children[1], + columnArray->children[0]->children[1]); + break; + } + + case SnowflakeType::Type::OBJECT: { + switch (columnSchemaView.type) { + case NANOARROW_TYPE_STRING: + break; + case NANOARROW_TYPE_STRUCT: + for (int i = 0; i < columnSchemaView.schema->n_children; i++) { + convertIfNeeded(columnSchemaView.schema->children[i], + columnArray->children[i]); + } + break; + default: + PyErr_SetString( + PyExc_Exception, + Logger::formatString( + "[Snowflake Exception] unknown arrow type(%s) " + "for OBJECT data in %s", + NANOARROW_TYPE_ENUM_STRING[columnSchemaView.type], + columnSchemaView.schema->name).c_str()); + } + break; + } + + case SnowflakeType::Type::INTERVAL_DAY_TIME: { + int scale = 9; + if (metadata != nullptr) { + ArrowStringView scaleString = ArrowCharView(nullptr); + returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), + &scaleString); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'scale', " + "error code: %d", + returnCode); + scale = std::stoi( + std::string(scaleString.data, scaleString.size_bytes)); + } + convertIntervalDayTimeColumn_nanoarrow(&columnSchemaView, columnArray, + scale); + break; + } + + case SnowflakeType::Type::TIME: { + int scale = 9; + if (metadata != nullptr) { + ArrowStringView scaleString = ArrowCharView(nullptr); + returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), + &scaleString); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'scale', " + "error code: %d", + returnCode); + scale = std::stoi( + std::string(scaleString.data, scaleString.size_bytes)); + } + convertTimeColumn_nanoarrow(&columnSchemaView, columnArray, scale); + break; + } + + case SnowflakeType::Type::TIMESTAMP_NTZ: { + int scale = 9; + if (metadata != nullptr) { + ArrowStringView scaleString = ArrowCharView(nullptr); + returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), + &scaleString); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'scale', " + "error code: %d", + returnCode); + scale = std::stoi( + std::string(scaleString.data, scaleString.size_bytes)); + } + convertTimestampColumn_nanoarrow(&columnSchemaView, columnArray, scale); + break; + } + + case SnowflakeType::Type::TIMESTAMP_LTZ: { + int scale = 9; + if (metadata != nullptr) { + ArrowStringView scaleString = ArrowCharView(nullptr); + returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), + &scaleString); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'scale', " + "error code: %d", + returnCode); + scale = std::stoi( + std::string(scaleString.data, scaleString.size_bytes)); + } + convertTimestampColumn_nanoarrow(&columnSchemaView, columnArray, scale, + m_timezone); + break; + } + + case SnowflakeType::Type::TIMESTAMP_TZ: { + int scale = 9; + int byteLength = 16; + if (metadata != nullptr) { + ArrowStringView scaleString = ArrowCharView(nullptr); + ArrowStringView byteLengthString = ArrowCharView(nullptr); + returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), + &scaleString); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'scale', " + "error code: %d", + returnCode); + returnCode = ArrowMetadataGetValue(metadata, + ArrowCharView("byteLength"), + &byteLengthString); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error getting 'byteLength', " + "error code: %d", + returnCode); + scale = std::stoi( + std::string(scaleString.data, scaleString.size_bytes)); + if (byteLengthString.data != nullptr) { + byteLength = std::stoi( + std::string(byteLengthString.data, + byteLengthString.size_bytes)); + } + } + convertTimestampTZColumn_nanoarrow(&columnSchemaView, columnArray, + scale, byteLength, m_timezone); + break; + } + + default: + PyErr_SetString( + PyExc_Exception, + Logger::formatString( + "[Snowflake Exception] unknown Snowflake type: %s", + snowflakeLogicalType.data).c_str()); + } +} + +// --------------------------------------------------------------------------- +// Scaled fixed-number conversion +// --------------------------------------------------------------------------- + +template +double ArrowTableConverter::convertScaledFixedNumberToDouble( + unsigned int scale, T originalValue) { + if (scale < 9) { + return static_cast(originalValue) / + sf::internal::powTenSB4[scale]; + } + std::string valStr = std::to_string(originalValue); + int negative = valStr.at(0) == '-' ? 1 : 0; + unsigned int digits = valStr.length() - negative; + if (digits <= scale) { + int numZeroes = scale - digits + 1; + valStr.insert(negative, std::string(numZeroes, '0')); + } + valStr.insert(valStr.length() - scale, "."); + std::size_t offset = 0; + return std::stod(valStr, &offset); +} + +void ArrowTableConverter::convertScaledFixedNumberColumn_nanoarrow( + ArrowSchemaView* field, ArrowArrayView* columnArray, + unsigned int scale) { + if (m_convert_number_to_decimal) { + convertScaledFixedNumberColumnToDecimalColumn_nanoarrow(field, columnArray, + scale); + } else { + convertScaledFixedNumberColumnToDoubleColumn_nanoarrow(field, columnArray, + scale); + } +} + +void ArrowTableConverter::convertScaledFixedNumberColumnToDecimalColumn_nanoarrow( + ArrowSchemaView* field, ArrowArrayView* columnArray, + unsigned int scale) { + int returnCode = 0; + nanoarrow::UniqueSchema newUniqueField; + nanoarrow::UniqueArray newUniqueArray; + ArrowSchema* newSchema = newUniqueField.get(); + ArrowArray* newArray = newUniqueArray.get(); + ArrowError error; + + ArrowSchemaInit(newSchema); + newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); + returnCode = ArrowSchemaSetTypeDecimal(newSchema, NANOARROW_TYPE_DECIMAL128, + 38, scale); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema type " + "decimal, error code: %d", + returnCode); + returnCode = ArrowSchemaSetName(newSchema, field->schema->name); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema name, " + "error code: %d", + returnCode); + + returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing array from " + "schema: %s, error code: %d", + ArrowErrorMessage(&error), returnCode); + returnCode = ArrowArrayStartAppending(newArray); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error starting array appending, " + "error code: %d", + returnCode); + + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + auto originalVal = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); + ArrowDecimal arrowDecimal; + ArrowDecimalInit(&arrowDecimal, 128, 38, scale); + ArrowDecimalSetInt(&arrowDecimal, originalVal); + returnCode = ArrowArrayAppendDecimal(newArray, &arrowDecimal); + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending decimal value, " + "error code: %d", + returnCode); + } + + returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error finishing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + field->schema->release(field->schema); + ArrowSchemaMove(newSchema, field->schema); + columnArray->array->release(columnArray->array); + ArrowArrayMove(newArray, columnArray->array); +} + +void ArrowTableConverter::convertScaledFixedNumberColumnToDoubleColumn_nanoarrow( + ArrowSchemaView* field, ArrowArrayView* columnArray, + unsigned int scale) { + int returnCode = 0; + nanoarrow::UniqueSchema newUniqueField; + nanoarrow::UniqueArray newUniqueArray; + ArrowSchema* newSchema = newUniqueField.get(); + ArrowArray* newArray = newUniqueArray.get(); + ArrowError error; + + ArrowSchemaInit(newSchema); + newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); + returnCode = ArrowSchemaSetType(newSchema, NANOARROW_TYPE_DOUBLE); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema type double, " + "error code: %d", + returnCode); + returnCode = ArrowSchemaSetName(newSchema, field->schema->name); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema name, " + "error code: %d", + returnCode); + + returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing array from " + "schema: %s, error code: %d", + ArrowErrorMessage(&error), returnCode); + + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + auto originalVal = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); + returnCode = ArrowArrayAppendDouble( + newArray, convertScaledFixedNumberToDouble(scale, originalVal)); + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending double value, " + "error code: %d", + returnCode); + } + + returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error finishing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + field->schema->release(field->schema); + ArrowSchemaMove(newSchema, field->schema); + columnArray->array->release(columnArray->array); + ArrowArrayMove(newArray, columnArray->array); +} + +// --------------------------------------------------------------------------- +// Interval Day-Time +// --------------------------------------------------------------------------- + +void ArrowTableConverter::convertIntervalDayTimeColumn_nanoarrow( + ArrowSchemaView* field, ArrowArrayView* columnArray, int scale) { + int returnCode = 0; + nanoarrow::UniqueSchema newUniqueField; + nanoarrow::UniqueArray newUniqueArray; + ArrowSchema* newSchema = newUniqueField.get(); + ArrowArray* newArray = newUniqueArray.get(); + ArrowError error; + + ArrowSchemaInit(newSchema); + newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); + returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_DURATION, + NANOARROW_TIME_UNIT_NANO, nullptr); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema DateTime, " + "error code: %d", + returnCode); + returnCode = ArrowSchemaSetName(newSchema, field->schema->name); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema name, " + "error code: %d", + returnCode); + + returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing array from " + "schema: %s, error code: %d", + ArrowErrorMessage(&error), returnCode); + returnCode = ArrowArrayStartAppending(newArray); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error starting appending, " + "error code: %d", + returnCode); + + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + ArrowDecimal arrowDecimal; + ArrowDecimalInit(&arrowDecimal, 128, 38, 0); + ArrowArrayViewGetDecimalUnsafe(columnArray, rowIdx, &arrowDecimal); + returnCode = + ArrowArrayAppendInt(newArray, ArrowDecimalGetIntUnsafe(&arrowDecimal)); + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending interval value, " + "error code: %d", + returnCode); + } + + returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error finishing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + field->schema->release(field->schema); + ArrowSchemaMove(newSchema, field->schema); + columnArray->array->release(columnArray->array); + ArrowArrayMove(newArray, columnArray->array); +} + +// --------------------------------------------------------------------------- +// Time +// --------------------------------------------------------------------------- + +void ArrowTableConverter::convertTimeColumn_nanoarrow( + ArrowSchemaView* field, ArrowArrayView* columnArray, int scale) { + int returnCode = 0; + nanoarrow::UniqueSchema newUniqueField; + nanoarrow::UniqueArray newUniqueArray; + ArrowSchema* newSchema = newUniqueField.get(); + ArrowArray* newArray = newUniqueArray.get(); + ArrowError error; + + ArrowSchemaInit(newSchema); + newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); + + int64_t powTenSB4Val = 1; + if (scale == 0) { + returnCode = ArrowSchemaSetTypeDateTime( + newSchema, NANOARROW_TYPE_TIME32, NANOARROW_TIME_UNIT_SECOND, nullptr); + } else if (scale <= 3) { + returnCode = ArrowSchemaSetTypeDateTime( + newSchema, NANOARROW_TYPE_TIME32, NANOARROW_TIME_UNIT_MILLI, nullptr); + powTenSB4Val = sf::internal::powTenSB4[3 - scale]; + } else if (scale <= 6) { + returnCode = ArrowSchemaSetTypeDateTime( + newSchema, NANOARROW_TYPE_TIME64, NANOARROW_TIME_UNIT_MICRO, nullptr); + powTenSB4Val = sf::internal::powTenSB4[6 - scale]; + } else { + returnCode = ArrowSchemaSetTypeDateTime( + newSchema, NANOARROW_TYPE_TIME64, NANOARROW_TIME_UNIT_MICRO, nullptr); + powTenSB4Val = sf::internal::powTenSB4[scale - 6]; + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema DateTime, " + "error code: %d", + returnCode); + returnCode = ArrowSchemaSetName(newSchema, field->schema->name); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema name, " + "error code: %d", + returnCode); + + returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + returnCode = ArrowArrayStartAppending(newArray); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error starting appending, " + "error code: %d", + returnCode); + + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + auto val = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); + val = (scale <= 6) ? (val * powTenSB4Val) : (val / powTenSB4Val); + returnCode = ArrowArrayAppendInt(newArray, val); + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending time value, " + "error code: %d", + returnCode); + } + + returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error finishing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + field->schema->release(field->schema); + ArrowSchemaMove(newSchema, field->schema); + columnArray->array->release(columnArray->array); + ArrowArrayMove(newArray, columnArray->array); +} + +// --------------------------------------------------------------------------- +// Nanosecond timestamp overflow helper +// --------------------------------------------------------------------------- + +static bool _checkNanosecondTimestampOverflowAndDownscale( + ArrowArrayView* columnArray, ArrowArrayView* epochArray, + ArrowArrayView* fractionArray) { + int powTenSB4 = sf::internal::powTenSB4[9]; + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (!ArrowArrayViewIsNull(columnArray, rowIdx)) { + int64_t epoch = ArrowArrayViewGetIntUnsafe(epochArray, rowIdx); + int64_t fraction = ArrowArrayViewGetIntUnsafe(fractionArray, rowIdx); + if (epoch > (INT64_MAX / powTenSB4) || + epoch < (INT64_MIN / powTenSB4)) { + if (fraction % 1000 != 0) { + std::string errorInfo = Logger::formatString( + "The total number of nanoseconds %d%d overflows int64 range. " + "If you use a timestamp with the nanosecond part over 6-digits " + "in the Snowflake database, the timestamp must be between " + "'1677-09-21 00:12:43.145224192' and '2262-04-11 " + "23:47:16.854775807' to not overflow.", + epoch, fraction); + throw std::overflow_error(errorInfo.c_str()); + } + return true; + } + } + } + return false; +} + +// --------------------------------------------------------------------------- +// Timestamp NTZ / LTZ +// --------------------------------------------------------------------------- + +void ArrowTableConverter::convertTimestampColumn_nanoarrow( + ArrowSchemaView* field, ArrowArrayView* columnArray, int scale, + const std::string& timezone) { + int returnCode = 0; + nanoarrow::UniqueSchema newUniqueField; + nanoarrow::UniqueArray newUniqueArray; + ArrowSchema* newSchema = newUniqueField.get(); + ArrowArray* newArray = newUniqueArray.get(); + ArrowError error; + + ArrowSchemaInit(newSchema); + newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); + + const char* tz = timezone.empty() ? nullptr : timezone.c_str(); + + ArrowArrayView* epochArray = nullptr; + ArrowArrayView* fractionArray = nullptr; + bool has_overflow = m_force_microsecond_precision; + if (!m_force_microsecond_precision && scale > 6 && + field->type == NANOARROW_TYPE_STRUCT) { + for (int64_t i = 0; i < field->schema->n_children; i++) { + const char* name = field->schema->children[i]->name; + if (std::strcmp(name, internal::FIELD_NAME_EPOCH.c_str()) == 0) + epochArray = columnArray->children[i]; + else if (std::strcmp(name, internal::FIELD_NAME_FRACTION.c_str()) == 0) + fractionArray = columnArray->children[i]; + } + has_overflow = _checkNanosecondTimestampOverflowAndDownscale( + columnArray, epochArray, fractionArray); + } + + if (scale <= 6) { + auto timeunit = NANOARROW_TIME_UNIT_SECOND; + int64_t powTenSB4Val = 1; + if (scale == 0) { + timeunit = NANOARROW_TIME_UNIT_SECOND; + } else if (scale <= 3) { + timeunit = NANOARROW_TIME_UNIT_MILLI; + powTenSB4Val = sf::internal::powTenSB4[3 - scale]; + } else { + timeunit = NANOARROW_TIME_UNIT_MICRO; + powTenSB4Val = sf::internal::powTenSB4[6 - scale]; + } + returnCode = ArrowSchemaSetTypeDateTime( + newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, tz); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema DateTime, " + "error code: %d", + returnCode); + returnCode = ArrowSchemaSetName(newSchema, field->schema->name); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema name, " + "error code: %d", + returnCode); + returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + int64_t val = + ArrowArrayViewGetIntUnsafe(columnArray, rowIdx) * powTenSB4Val; + returnCode = ArrowArrayAppendInt(newArray, val); + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending timestamp, " + "error code: %d", + returnCode); + } + } else { + auto timeunit = + has_overflow ? NANOARROW_TIME_UNIT_MICRO : NANOARROW_TIME_UNIT_NANO; + returnCode = ArrowSchemaSetTypeDateTime( + newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, tz); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema DateTime, " + "error code: %d", + returnCode); + returnCode = ArrowSchemaSetName(newSchema, field->schema->name); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema name, " + "error code: %d", + returnCode); + returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + + if (field->type == NANOARROW_TYPE_STRUCT) { + epochArray = nullptr; + fractionArray = nullptr; + for (int64_t i = 0; i < field->schema->n_children; i++) { + const char* name = field->schema->children[i]->name; + if (std::strcmp(name, internal::FIELD_NAME_EPOCH.c_str()) == 0) + epochArray = columnArray->children[i]; + else if (std::strcmp(name, internal::FIELD_NAME_FRACTION.c_str()) == 0) + fractionArray = columnArray->children[i]; + } + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + int64_t epoch = ArrowArrayViewGetIntUnsafe(epochArray, rowIdx); + int64_t frac = ArrowArrayViewGetIntUnsafe(fractionArray, rowIdx); + int64_t val = has_overflow + ? (epoch * sf::internal::powTenSB4[6] + frac / 1000) + : (epoch * sf::internal::powTenSB4[9] + frac); + returnCode = ArrowArrayAppendInt(newArray, val); + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending timestamp, " + "error code: %d", + returnCode); + } + } else if (field->type == NANOARROW_TYPE_INT64) { + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + int64_t val = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx) * + sf::internal::powTenSB4[9 - scale]; + returnCode = ArrowArrayAppendInt(newArray, val); + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending timestamp, " + "error code: %d", + returnCode); + } + } + } + + returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error finishing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + field->schema->release(field->schema); + ArrowSchemaMove(newSchema, field->schema); + columnArray->array->release(columnArray->array); + ArrowArrayMove(newArray, columnArray->array); +} + +// --------------------------------------------------------------------------- +// Timestamp TZ +// --------------------------------------------------------------------------- + +void ArrowTableConverter::convertTimestampTZColumn_nanoarrow( + ArrowSchemaView* field, ArrowArrayView* columnArray, int scale, + int byteLength, const std::string& timezone) { + int returnCode = 0; + nanoarrow::UniqueSchema newUniqueField; + nanoarrow::UniqueArray newUniqueArray; + ArrowSchema* newSchema = newUniqueField.get(); + ArrowArray* newArray = newUniqueArray.get(); + ArrowError error; + + ArrowSchemaInit(newSchema); + newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); + + ArrowArrayView* epochArray = nullptr; + ArrowArrayView* fractionArray = nullptr; + for (int64_t i = 0; i < field->schema->n_children; i++) { + const char* name = field->schema->children[i]->name; + if (std::strcmp(name, internal::FIELD_NAME_EPOCH.c_str()) == 0) + epochArray = columnArray->children[i]; + else if (std::strcmp(name, internal::FIELD_NAME_FRACTION.c_str()) == 0) + fractionArray = columnArray->children[i]; + } + + bool has_overflow = m_force_microsecond_precision; + if (!m_force_microsecond_precision && scale > 6 && byteLength == 16) { + has_overflow = _checkNanosecondTimestampOverflowAndDownscale( + columnArray, epochArray, fractionArray); + } + + auto timeunit = NANOARROW_TIME_UNIT_SECOND; + if (scale == 0) + timeunit = NANOARROW_TIME_UNIT_SECOND; + else if (scale <= 3) + timeunit = NANOARROW_TIME_UNIT_MILLI; + else if (scale <= 6) + timeunit = NANOARROW_TIME_UNIT_MICRO; + else + timeunit = + has_overflow ? NANOARROW_TIME_UNIT_MICRO : NANOARROW_TIME_UNIT_NANO; + + const char* tz = timezone.empty() ? nullptr : timezone.c_str(); + returnCode = ArrowSchemaSetTypeDateTime( + newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, tz); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema DateTime, " + "error code: %d", + returnCode); + returnCode = ArrowSchemaSetName(newSchema, field->schema->name); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error setting schema name, " + "error code: %d", + returnCode); + returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error initializing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + + for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { + if (ArrowArrayViewIsNull(columnArray, rowIdx)) { + returnCode = ArrowArrayAppendNull(newArray, 1); + } else { + int64_t epoch = ArrowArrayViewGetIntUnsafe(epochArray, rowIdx); + if (byteLength == 8) { + int64_t val; + if (scale == 0) + val = epoch; + else if (scale <= 3) + val = epoch * sf::internal::powTenSB4[3 - scale]; + else if (scale <= 6) + val = epoch * sf::internal::powTenSB4[6 - scale]; + else + val = has_overflow ? (epoch * sf::internal::powTenSB4[6]) + : (epoch * sf::internal::powTenSB4[9 - scale]); + returnCode = ArrowArrayAppendInt(newArray, val); + } else if (byteLength == 16) { + int64_t frac = ArrowArrayViewGetIntUnsafe(fractionArray, rowIdx); + int64_t val; + if (scale == 0) + val = epoch; + else if (scale <= 3) + val = epoch * sf::internal::powTenSB4[3 - scale] + + frac / sf::internal::powTenSB4[6]; + else if (scale <= 6) + val = epoch * sf::internal::powTenSB4[6] + + frac / sf::internal::powTenSB4[3]; + else + val = has_overflow + ? (epoch * sf::internal::powTenSB4[6] + frac / 1000) + : (epoch * sf::internal::powTenSB4[9] + frac); + returnCode = ArrowArrayAppendInt(newArray, val); + } else { + PyErr_SetString( + PyExc_Exception, + Logger::formatString( + "[Snowflake Exception] unknown byteLength(%d) for " + "TIMESTAMP_TZ", + byteLength).c_str()); + return; + } + } + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error appending timestamp_tz, " + "error code: %d", + returnCode); + } + + returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); + SF_CHECK_ARROW_RC(returnCode, + "[Snowflake Exception] error finishing array: %s, " + "error code: %d", + ArrowErrorMessage(&error), returnCode); + field->schema->release(field->schema); + ArrowSchemaMove(newSchema, field->schema); + columnArray->array->release(columnArray->array); + ArrowArrayMove(newArray, columnArray->array); +} + +} // namespace sf diff --git a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/ArrowTableConverter.hpp b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/ArrowTableConverter.hpp new file mode 100644 index 000000000..dfe3d914b --- /dev/null +++ b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/ArrowTableConverter.hpp @@ -0,0 +1,63 @@ +#ifndef PC_ARROWTABLECONVERTER_HPP +#define PC_ARROWTABLECONVERTER_HPP + +#include + +#include "CArrowIterator.hpp" +#include "logging.hpp" +#include "nanoarrow.h" +#include "nanoarrow.hpp" + +namespace sf { + +/** + * Stateless-ish helper that applies Snowflake-specific type conversions to + * Arrow columns in-place. Both CArrowTableIterator (IPC-based) and + * CArrowStreamTableIterator (stream-based) delegate all per-column + * conversion to this class so the logic lives in a single place. + * + * Construction captures the three session-level knobs that affect + * conversion behaviour; individual columns are converted via + * convertIfNeeded(). + */ +class ArrowTableConverter { + public: + ArrowTableConverter(bool number_to_decimal, bool force_microsecond_precision, const std::string& timezone); + + /** + * Inspect the Snowflake logical-type metadata on @p columnSchema and + * convert @p columnArray in-place when necessary. + * + * Sets a Python exception on failure (caller should check + * py::checkPyError() after the call). + */ + void convertIfNeeded(ArrowSchema* columnSchema, ArrowArrayView* columnArray); + + private: + void convertScaledFixedNumberColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, unsigned int scale); + + void convertScaledFixedNumberColumnToDecimalColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, unsigned int scale); + + void convertScaledFixedNumberColumnToDoubleColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, unsigned int scale); + + void convertTimeColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, int scale); + + void convertIntervalDayTimeColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, int scale); + + void convertTimestampColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, int scale, const std::string& timezone = ""); + + void convertTimestampTZColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, int scale, int byteLength, const std::string& timezone); + + template + double convertScaledFixedNumberToDouble(unsigned int scale, T originalValue); + + const bool m_convert_number_to_decimal; + const bool m_force_microsecond_precision; + const std::string m_timezone; + + static Logger* logger; +}; + +} // namespace sf + +#endif // PC_ARROWTABLECONVERTER_HPP diff --git a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowStreamTableIterator.cpp b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowStreamTableIterator.cpp new file mode 100644 index 000000000..fb0721f7d --- /dev/null +++ b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowStreamTableIterator.cpp @@ -0,0 +1,244 @@ +#include "CArrowStreamTableIterator.hpp" + +#include +#include + +#include "Python/Common.hpp" + +namespace sf { + +Logger* CArrowStreamTableIterator::logger = + new Logger("snowflake.connector.CArrowStreamTableIterator"); + +namespace { + void releaseStream(ArrowArrayStream* s) { + if (s && s->release) s->release(s); + } +} // namespace + +// --------------------------------------------------------------------------- +// Factory / constructor +// --------------------------------------------------------------------------- + +std::unique_ptr CArrowStreamTableIterator::from_stream(int64_t stream_ptr, PyObject* context, bool number_to_decimal, bool force_microsecond_precision) { + auto* stream = reinterpret_cast(stream_ptr); + if (!stream || !stream->release) { + PyErr_SetString(PyExc_Exception, "[Snowflake Exception] Invalid ArrowArrayStream pointer"); + return nullptr; + } + + ArrowSchema schema{}; + int rc = stream->get_schema(stream, &schema); + if (rc != 0) { + const char* msg = stream->get_last_error(stream); + std::string err = Logger::formatString( + "[Snowflake Exception] error getting schema: %s, error code: %d", msg ? msg : "unknown", rc); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_Exception, err.c_str()); + return nullptr; + } + + return std::unique_ptr( + new CArrowStreamTableIterator(stream, &schema, context, number_to_decimal, force_microsecond_precision)); +} + +CArrowStreamTableIterator::CArrowStreamTableIterator(ArrowArrayStream* stream, ArrowSchema* schema, PyObject* context, bool number_to_decimal, bool force_microsecond_precision) + : m_stream(stream, releaseStream), + m_columnCount(schema->n_children), + m_context(context) { + ArrowSchemaMove(schema, m_schema.get()); + + char* timezone = nullptr; + py::UniqueRef tz(PyObject_GetAttrString(m_context, "_timezone")); + PyArg_Parse(tz.get(), "z", &timezone); + + m_converter = std::unique_ptr( + new ArrowTableConverter(number_to_decimal, force_microsecond_precision, timezone ? std::string(timezone) : std::string())); +} + +// --------------------------------------------------------------------------- +// Export accessors +// --------------------------------------------------------------------------- + +uintptr_t CArrowStreamTableIterator::getArrowArrayPtr() { + return reinterpret_cast(m_exportArray.get()); +} + +uintptr_t CArrowStreamTableIterator::getArrowSchemaPtr() { + return reinterpret_cast(m_exportSchema.get()); +} + +uintptr_t CArrowStreamTableIterator::getConvertedSchemaPtr() { + m_streamSchemaExport.reset(); + int rc = ArrowSchemaDeepCopy(m_schema.get(), m_streamSchemaExport.get()); + if (rc != NANOARROW_OK) { + std::string err = Logger::formatString( + "[Snowflake Exception] error deep-copying stream schema, error code: %d", rc); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_RuntimeError, err.c_str()); + return 0; + } + + ArrowError error; + + nanoarrow::UniqueArray emptyArray; + rc = ArrowArrayInitFromSchema(emptyArray.get(), m_schema.get(), &error); + if (rc != NANOARROW_OK) { + std::string err = Logger::formatString( + "[Snowflake Exception] error initializing empty array for schema conversion: %s, error code: %d", + ArrowErrorMessage(&error), rc); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_RuntimeError, err.c_str()); + return 0; + } + rc = ArrowArrayFinishBuildingDefault(emptyArray.get(), &error); + if (rc != NANOARROW_OK) { + std::string err = Logger::formatString( + "[Snowflake Exception] error finishing empty array: %s, error code: %d", + ArrowErrorMessage(&error), rc); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_RuntimeError, err.c_str()); + return 0; + } + + nanoarrow::UniqueArrayView emptyView; + rc = ArrowArrayViewInitFromSchema(emptyView.get(), m_schema.get(), &error); + if (rc != NANOARROW_OK) { + std::string err = Logger::formatString( + "[Snowflake Exception] error initializing empty array view: %s, error code: %d", + ArrowErrorMessage(&error), rc); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_RuntimeError, err.c_str()); + return 0; + } + rc = ArrowArrayViewSetArray(emptyView.get(), emptyArray.get(), &error); + if (rc != NANOARROW_OK) { + std::string err = Logger::formatString( + "[Snowflake Exception] error setting empty array view: %s, error code: %d", + ArrowErrorMessage(&error), rc); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_RuntimeError, err.c_str()); + return 0; + } + + for (int64_t col = 0; col < m_columnCount; col++) { + m_converter->convertIfNeeded(m_streamSchemaExport->children[col], emptyView->children[col]); + if (py::checkPyError()) return 0; + } + + return reinterpret_cast(m_streamSchemaExport.get()); +} + +// --------------------------------------------------------------------------- +// Stream loading +// --------------------------------------------------------------------------- + +bool CArrowStreamTableIterator::loadNextBatch() { + if (m_streamExhausted) return false; + + m_currentArray.reset(); + m_currentArrayView.reset(); + + int returnCode; + ArrowArrayStream* stream = m_stream.get(); + { + Py_BEGIN_ALLOW_THREADS + returnCode = stream->get_next(stream, m_currentArray.get()); + Py_END_ALLOW_THREADS + } + + if (returnCode != NANOARROW_OK) { + const char* msg = stream->get_last_error(stream); + std::string err = Logger::formatString( + "[Snowflake Exception] error getting next batch: %s, error code: %d", msg ? msg : "unknown", returnCode); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_Exception, err.c_str()); + return false; + } + + if (m_currentArray->release == nullptr) { + m_streamExhausted = true; + return false; + } + + if (m_currentArray->length == 0) { + return loadNextBatch(); + } + + ArrowError error; + returnCode = ArrowArrayViewInitFromSchema(m_currentArrayView.get(), m_schema.get(), &error); + if (returnCode != NANOARROW_OK) { + std::string err = Logger::formatString( + "[Snowflake Exception] error initializing ArrowArrayView: %s, error code: %d", + ArrowErrorMessage(&error), returnCode); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_Exception, err.c_str()); + return false; + } + + returnCode = ArrowArrayViewSetArray(m_currentArrayView.get(), m_currentArray.get(), &error); + if (returnCode != NANOARROW_OK) { + std::string err = Logger::formatString( + "[Snowflake Exception] error setting ArrowArrayView: %s, error code: %d", + ArrowErrorMessage(&error), returnCode); + logger->error(__FILE__, __func__, __LINE__, err.c_str()); + PyErr_SetString(PyExc_Exception, err.c_str()); + return false; + } + + return true; +} + +// --------------------------------------------------------------------------- +// convertBatch() +// --------------------------------------------------------------------------- + +void CArrowStreamTableIterator::convertBatch() { + m_exportArray.reset(); + m_exportSchema.reset(); + + int rc = ArrowSchemaDeepCopy(m_schema.get(), m_exportSchema.get()); + SF_CHECK_ARROW_RC(rc, "[Snowflake Exception] error deep-copying schema, error code: %d", rc); + + for (int64_t col = 0; col < m_columnCount; col++) { + m_converter->convertIfNeeded(m_exportSchema->children[col], m_currentArrayView->children[col]); + if (py::checkPyError()) return; + } + + ArrowArrayMove(m_currentArray.get(), m_exportArray.get()); +} + +// --------------------------------------------------------------------------- +// next() +// --------------------------------------------------------------------------- + +ReturnVal CArrowStreamTableIterator::next() { + if (!loadNextBatch()) { + if (py::checkPyError()) { + PyObject *type, *val, *traceback; + PyErr_Fetch(&type, &val, &traceback); + PyErr_Clear(); + m_currentPyException.reset(val); + Py_XDECREF(type); + Py_XDECREF(traceback); + return ReturnVal(nullptr, m_currentPyException.get()); + } + return ReturnVal(Py_None, nullptr); + } + + convertBatch(); + + if (py::checkPyError()) { + PyObject *type, *val, *traceback; + PyErr_Fetch(&type, &val, &traceback); + PyErr_Clear(); + m_currentPyException.reset(val); + Py_XDECREF(type); + Py_XDECREF(traceback); + return ReturnVal(nullptr, m_currentPyException.get()); + } + + return ReturnVal(Py_True, nullptr); +} + +} // namespace sf diff --git a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowStreamTableIterator.hpp b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowStreamTableIterator.hpp new file mode 100644 index 000000000..ab31e428d --- /dev/null +++ b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowStreamTableIterator.hpp @@ -0,0 +1,163 @@ +#ifndef PC_CARROWSTREAMTABLEITERATOR_HPP +#define PC_CARROWSTREAMTABLEITERATOR_HPP + +#include + +#include "ArrowTableConverter.hpp" +#include "CArrowIterator.hpp" +#include "Python/Common.hpp" +#include "logging.hpp" +#include "nanoarrow.h" +#include "nanoarrow.hpp" + +namespace sf { + +/** + * Stream-based Arrow iterator that reads batches from an ArrowArrayStream + * and exposes each batch as a (ArrowArray, ArrowSchema) pair suitable for + * import into PyArrow via RecordBatch._import_from_c(). + * + * Snowflake-specific type conversions (e.g. scaled NUMBER -> float64, + * struct-encoded TIMESTAMP -> Arrow timestamp) are applied to every batch + * through the shared ArrowTableConverter. + * + * Lifecycle (driven from the Cython wrapper ArrowDummyIterator): + * 1. from_stream() – validate stream, read schema, construct instance + * 2. next() – load next batch, convert, expose via export slots + * 3. getArrowArrayPtr() / getArrowSchemaPtr() + * – caller passes these to _import_from_c() which + * consumes them (sets release to nullptr) + * 4. repeat 2-3 until next() signals exhaustion (Py_None) + */ +class CArrowStreamTableIterator { + public: + /** + * Factory method. Validates the stream pointer, reads the schema, and + * returns a ready-to-iterate instance. + * + * @param stream_ptr Pointer to ArrowArrayStream as int64 (FFI from Python) + * @param context Python ArrowConverterContext carrying session timezone + * @param number_to_decimal + * If true, scaled FIXED columns become Decimal128; otherwise float64 + * @param force_microsecond_precision + * If true, all TIMESTAMP columns are forced to microsecond precision, + * ensuring consistent schema across batches whose data may span + * outside the nanosecond-representable range (1677-2262) + * @return Unique pointer to the iterator, or nullptr with a Python + * exception set on failure + */ + static std::unique_ptr from_stream( + int64_t stream_ptr, PyObject* context, bool number_to_decimal, + bool force_microsecond_precision); + + /** + * Advance to the next batch. + * + * @return ReturnVal with: + * - successObj = Py_True, exception = nullptr → batch ready; call + * getArrowArrayPtr() / getArrowSchemaPtr() to retrieve it + * - successObj = Py_None, exception = nullptr → stream exhausted + * - successObj = nullptr, exception != nullptr → error occurred + */ + ReturnVal next(); + + /** + * Raw pointer (as uintptr_t) to the converted ArrowArray for the + * current batch. Valid only immediately after next() returns Py_True. + * Ownership transfers to the caller via _import_from_c(). + */ + uintptr_t getArrowArrayPtr(); + + /** + * Raw pointer (as uintptr_t) to the deep-copied ArrowSchema for the + * current batch. Same ownership semantics as getArrowArrayPtr(). + */ + uintptr_t getArrowSchemaPtr(); + + /** + * Raw pointer (as uintptr_t) to the stream schema after applying + * Snowflake type conversions (e.g. struct → timestamp, int → float64). + * Unlike getArrowSchemaPtr(), this is always valid—even when no batch + * has been loaded—making it safe to call for empty result sets + * (e.g. to build an empty PyArrow table with the correct column types). + * + * Internally synthesises a zero-row batch and runs it through + * ArrowTableConverter so the schema matches what non-empty results + * would produce. + * + * Ownership transfers to the caller via _import_from_c(). + */ + uintptr_t getConvertedSchemaPtr(); + + private: + CArrowStreamTableIterator(ArrowArrayStream* stream, ArrowSchema* schema, + PyObject* context, bool number_to_decimal, + bool force_microsecond_precision); + + /** + * Pull the next non-empty batch from the stream into m_currentArray. + * Releases the GIL during stream->get_next() so concurrent chunk + * downloads (e.g. from S3) are not blocked by the Python interpreter. + * Also initialises m_currentArrayView for the conversion step. + * + * @return true if a batch was loaded; false if exhausted or error + */ + bool loadNextBatch(); + + /** + * Apply Snowflake type conversions to the current batch via + * m_converter, then move the result into the export slots + * (m_exportArray / m_exportSchema). + * Sets a Python exception on failure (check py::checkPyError()). + */ + void convertBatch(); + + /** Owned handle to the ArrowArrayStream; released on destruction. */ + std::unique_ptr m_stream; + + /** Stream schema, read once at construction; immutable across batches. */ + nanoarrow::UniqueSchema m_schema; + + /** Raw batch from the most recent stream->get_next() call. */ + nanoarrow::UniqueArray m_currentArray; + + /** View over m_currentArray; rebuilt each loadNextBatch() for the + * converter to read column data through. */ + nanoarrow::UniqueArrayView m_currentArrayView; + + /** Converted batch ready for PyArrow import. Populated by + * convertBatch(); consumed by _import_from_c() via getArrowArrayPtr(). */ + nanoarrow::UniqueArray m_exportArray; + + /** Deep-copied and possibly rewritten schema for the current batch. + * Each batch gets its own copy because conversion may alter child + * schemas (e.g. struct → timestamp). Consumed via getArrowSchemaPtr(). */ + nanoarrow::UniqueSchema m_exportSchema; + + /** Deep copy of m_schema used by getStreamSchemaPtr(). Separate from + * m_exportSchema so it can be produced independently of batch iteration. */ + nanoarrow::UniqueSchema m_streamSchemaExport; + + /** Set to true once stream->get_next() returns an empty release sentinel. */ + bool m_streamExhausted = false; + + /** Number of top-level columns, cached from schema.n_children. */ + int64_t m_columnCount = 0; + + /** Python ArrowConverterContext; borrowed reference kept alive by the + * Cython wrapper which stores it as `self.arrow_context`. */ + PyObject* m_context; + + /** Applies Snowflake-specific type conversions per column. */ + std::unique_ptr m_converter; + + /** Holds the most recent Python exception so the pointer remains valid + * until the next call into the iterator. */ + py::UniqueRef m_currentPyException; + + static Logger* logger; +}; + +} // namespace sf + +#endif // PC_CARROWSTREAMTABLEITERATOR_HPP diff --git a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowTableIterator.cpp b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowTableIterator.cpp deleted file mode 100644 index 8e42c2024..000000000 --- a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowTableIterator.cpp +++ /dev/null @@ -1,1064 +0,0 @@ -#include "CArrowTableIterator.hpp" - -#include -#include -#include -#include - -#include "Python/Common.hpp" -#include "SnowflakeType.hpp" -#include "Util/time.hpp" - -namespace sf { - -void CArrowTableIterator::convertIfNeeded(ArrowSchema* columnSchema, ArrowArrayView* columnArray) { - ArrowSchemaView columnSchemaView; - ArrowError error; - int returnCode; - - returnCode = ArrowSchemaViewInit(&columnSchemaView, columnSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing " - "ArrowSchemaView : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - ArrowStringView snowflakeLogicalType; - const char* metadata = columnSchema->metadata; - returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("logicalType"), &snowflakeLogicalType); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'logicalType' " - "from Arrow metadata, error code: %d", - returnCode); - SnowflakeType::Type st = SnowflakeType::snowflakeTypeFromString( - std::string(snowflakeLogicalType.data, snowflakeLogicalType.size_bytes)); - - // reconstruct columnArray in place - switch (st) { - case SnowflakeType::Type::FIXED: { - int scale = 0; - struct ArrowStringView scaleString = ArrowCharView(nullptr); - if (metadata != nullptr) { - returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), &scaleString); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'scale' " - "from Arrow metadata, error code: %d", - returnCode); - scale = std::stoi(std::string(scaleString.data, scaleString.size_bytes)); - } - if (scale > 0 && columnSchemaView.type != ArrowType::NANOARROW_TYPE_DECIMAL128) { - logger->debug(__FILE__, __func__, __LINE__, - "Convert fixed number column to double column, " - "column scale %d, column type id: %d", - scale, columnSchemaView.type); - convertScaledFixedNumberColumn_nanoarrow(&columnSchemaView, columnArray, scale); - } - break; - } - - case SnowflakeType::Type::ANY: - case SnowflakeType::Type::BINARY: - case SnowflakeType::Type::BOOLEAN: - case SnowflakeType::Type::CHAR: - case SnowflakeType::Type::DATE: - case SnowflakeType::Type::REAL: - case SnowflakeType::Type::TEXT: - case SnowflakeType::Type::INTERVAL_YEAR_MONTH: - case SnowflakeType::Type::VARIANT: - case SnowflakeType::Type::VECTOR: { - // Do not need to convert - break; - } - - case SnowflakeType::Type::ARRAY: { - switch (columnSchemaView.type) { - case NANOARROW_TYPE_STRING: { - // No need to convert json encoded array - break; - } - case NANOARROW_TYPE_LIST: { - if (columnSchemaView.schema->n_children != 1) { - std::string errorInfo = Logger::formatString( - "[Snowflake Exception] invalid arrow schema for array items " - "expected 1 " - "schema child, but got %d", - columnSchemaView.schema->n_children); - logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); - PyErr_SetString(PyExc_Exception, errorInfo.c_str()); - break; - } - - ArrowSchema* item_schema = columnSchemaView.schema->children[0]; - ArrowArrayView* item_array = columnArray->children[0]; - convertIfNeeded(item_schema, item_array); - break; - } - default: { - std::string errorInfo = Logger::formatString( - "[Snowflake Exception] unknown arrow internal data type(%s) " - "for ARRAY data in %s", - NANOARROW_TYPE_ENUM_STRING[columnSchemaView.type], columnSchemaView.schema->name); - logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); - PyErr_SetString(PyExc_Exception, errorInfo.c_str()); - break; - } - } - break; - } - case SnowflakeType::Type::MAP: { - if (columnSchemaView.schema->n_children != 1) { - std::string errorInfo = Logger::formatString( - "[Snowflake Exception] invalid arrow schema for map entries " - "expected 1 " - "schema child, but got %d", - columnSchemaView.schema->n_children); - logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); - PyErr_SetString(PyExc_Exception, errorInfo.c_str()); - break; - } - - ArrowSchema* entries = columnSchemaView.schema->children[0]; - if (entries->n_children != 2) { - std::string errorInfo = Logger::formatString( - "[Snowflake Exception] invalid arrow schema for map key/value " - "pair " - "expected 2 entries, but got %d", - entries->n_children); - logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); - PyErr_SetString(PyExc_Exception, errorInfo.c_str()); - break; - } - - ArrowSchema* key_schema = entries->children[0]; - ArrowArrayView* key_array = columnArray->children[0]->children[0]; - convertIfNeeded(key_schema, key_array); - - ArrowSchema* value_schema = entries->children[1]; - ArrowArrayView* value_array = columnArray->children[0]->children[1]; - convertIfNeeded(value_schema, value_array); - break; - } - - case SnowflakeType::Type::OBJECT: { - switch (columnSchemaView.type) { - case NANOARROW_TYPE_STRING: { - // No need to convert json encoded data - break; - } - case NANOARROW_TYPE_STRUCT: { - // Object field names are strings that do not need conversion - // Child values values may need conversion. - for (int i = 0; i < columnSchemaView.schema->n_children; i++) { - ArrowSchema* property_schema = columnSchemaView.schema->children[i]; - ArrowArrayView* child_array = columnArray->children[i]; - convertIfNeeded(property_schema, child_array); - } - break; - } - default: { - std::string errorInfo = Logger::formatString( - "[Snowflake Exception] unknown arrow internal data type(%s) " - "for OBJECT data in %s", - NANOARROW_TYPE_ENUM_STRING[columnSchemaView.type], columnSchemaView.schema->name); - logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); - PyErr_SetString(PyExc_Exception, errorInfo.c_str()); - break; - } - } - break; - } - - case SnowflakeType::Type::INTERVAL_DAY_TIME: { - int scale = 9; - if (metadata != nullptr) { - struct ArrowStringView scaleString = ArrowCharView(nullptr); - returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), &scaleString); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'scale' " - "from Arrow metadata, error code: %d", - returnCode); - scale = std::stoi(std::string(scaleString.data, scaleString.size_bytes)); - } - convertIntervalDayTimeColumn_nanoarrow(&columnSchemaView, columnArray, scale); - break; - } - - case SnowflakeType::Type::TIME: { - int scale = 9; - if (metadata != nullptr) { - struct ArrowStringView scaleString = ArrowCharView(nullptr); - returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), &scaleString); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'scale' " - "from Arrow metadata, error code: %d", - returnCode); - scale = std::stoi(std::string(scaleString.data, scaleString.size_bytes)); - } - - convertTimeColumn_nanoarrow(&columnSchemaView, columnArray, scale); - break; - } - - case SnowflakeType::Type::TIMESTAMP_NTZ: { - int scale = 9; - if (metadata != nullptr) { - struct ArrowStringView scaleString = ArrowCharView(nullptr); - returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), &scaleString); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'scale' " - "from Arrow metadata, error code: %d", - returnCode); - scale = std::stoi(std::string(scaleString.data, scaleString.size_bytes)); - } - convertTimestampColumn_nanoarrow(&columnSchemaView, columnArray, scale); - break; - } - - case SnowflakeType::Type::TIMESTAMP_LTZ: { - int scale = 9; - if (metadata != nullptr) { - struct ArrowStringView scaleString = ArrowCharView(nullptr); - returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), &scaleString); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'scale' " - "from Arrow metadata, error code: %d", - returnCode); - scale = std::stoi(std::string(scaleString.data, scaleString.size_bytes)); - } - convertTimestampColumn_nanoarrow(&columnSchemaView, columnArray, scale, m_timezone); - break; - } - - case SnowflakeType::Type::TIMESTAMP_TZ: { - struct ArrowStringView scaleString = ArrowCharView(nullptr); - struct ArrowStringView byteLengthString = ArrowCharView(nullptr); - int scale = 9; - int byteLength = 16; - if (metadata != nullptr) { - returnCode = ArrowMetadataGetValue(metadata, ArrowCharView("scale"), &scaleString); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'scale' " - "from Arrow metadata, error code: %d", - returnCode); - returnCode = - ArrowMetadataGetValue(metadata, ArrowCharView("byteLength"), &byteLengthString); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error getting 'byteLength' from Arrow " - "metadata, error code: %d", - returnCode); - scale = std::stoi(std::string(scaleString.data, scaleString.size_bytes)); - // Data inside a structured type may not have bytelength metadata. - // Use default in this case. - if (byteLengthString.data != nullptr) { - byteLength = std::stoi(std::string(byteLengthString.data, byteLengthString.size_bytes)); - } - } - - convertTimestampTZColumn_nanoarrow(&columnSchemaView, columnArray, scale, byteLength, - m_timezone); - break; - } - - default: { - std::string errorInfo = Logger::formatString( - "[Snowflake Exception] unknown snowflake data type : %s", snowflakeLogicalType.data); - logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); - PyErr_SetString(PyExc_Exception, errorInfo.c_str()); - return; - } - } -} - -/** - * This function is to make sure the arrow table can be successfully converted - * to pandas dataframe using arrow's to_pandas method. Since some Snowflake - * arrow columns are not supported, this method can map those to supported ones. - * Specifically, - * All Snowflake fixed number with scale > 0 (expect decimal) will be - * converted to Arrow float64/double column All Snowflake time columns will be - * converted to Arrow Time column with unit = second, milli, or, micro. All - * Snowflake timestamp columns will be converted to Arrow timestamp columns - * Specifically, - * timestampntz will be converted to Arrow timestamp with UTC - * timestampltz will be converted to Arrow timestamp with session time zone - * timestamptz will be converted to Arrow timestamp with UTC - * Since Arrow timestamp use int64_t internally so it may be out of range for - * small and large timestamps - */ -void CArrowTableIterator::reconstructRecordBatches_nanoarrow() { - int returnCode = 0; - // Type conversion, the code needs to be optimized - for (unsigned int batchIdx = 0; batchIdx < m_ipcArrowArrayViewVec.size(); batchIdx++) { - nanoarrow::UniqueSchema copiedSchema; - returnCode = ArrowSchemaDeepCopy(m_ipcArrowSchema.get(), copiedSchema.get()); - SF_CHECK_ARROW_RC( - returnCode, "[Snowflake Exception] error copying arrow schema, error code: %d", returnCode); - m_ipcSchemaArrayVec.push_back(std::move(copiedSchema)); - - for (int colIdx = 0; colIdx < m_ipcSchemaArrayVec[batchIdx]->n_children; colIdx++) { - ArrowArrayView* columnArray = m_ipcArrowArrayViewVec[batchIdx]->children[colIdx]; - ArrowSchema* columnSchema = m_ipcSchemaArrayVec[batchIdx]->children[colIdx]; - convertIfNeeded(columnSchema, columnArray); - } - m_tableConverted = true; - } -} - -CArrowTableIterator::CArrowTableIterator(PyObject* context, char* arrow_bytes, - int64_t arrow_bytes_size, const bool number_to_decimal, - const bool force_microsecond_precision) - : CArrowIterator(arrow_bytes, arrow_bytes_size), - m_context(context), - m_convert_number_to_decimal(number_to_decimal), - m_force_microsecond_precision(force_microsecond_precision) { - if (py::checkPyError()) { - return; - } - py::UniqueRef tz(PyObject_GetAttrString(m_context, "_timezone")); - PyArg_Parse(tz.get(), "s", &m_timezone); -} - -ReturnVal CArrowTableIterator::next() { - bool firstDone = this->convertRecordBatchesToTable_nanoarrow(); - if (firstDone && !m_ipcArrowArrayVec.empty()) { - return ReturnVal(Py_True, nullptr); - } else { - return ReturnVal(Py_None, nullptr); - } -} - -template -double CArrowTableIterator::convertScaledFixedNumberToDouble(const unsigned int scale, - T originalValue) { - if (scale < 9) { - // simply use divide to convert decimal value in double - return (double)originalValue / sf::internal::powTenSB4[scale]; - } else { - // when scale is large, convert the value to string first and then convert - // it to double otherwise, it may loss precision - std::string valStr = std::to_string(originalValue); - int negative = valStr.at(0) == '-' ? 1 : 0; - unsigned int digits = valStr.length() - negative; - if (digits <= scale) { - int numOfZeroes = scale - digits + 1; - valStr.insert(negative, std::string(numOfZeroes, '0')); - } - valStr.insert(valStr.length() - scale, "."); - std::size_t offset = 0; - return std::stod(valStr, &offset); - } -} - -void CArrowTableIterator::convertScaledFixedNumberColumn_nanoarrow(ArrowSchemaView* field, - ArrowArrayView* columnArray, - const unsigned int scale) { - // Convert scaled fixed number to either Double, or Decimal based on setting - if (m_convert_number_to_decimal) { - convertScaledFixedNumberColumnToDecimalColumn_nanoarrow(field, columnArray, scale); - } else { - convertScaledFixedNumberColumnToDoubleColumn_nanoarrow(field, columnArray, scale); - } -} - -void CArrowTableIterator::convertScaledFixedNumberColumnToDecimalColumn_nanoarrow( - ArrowSchemaView* field, ArrowArrayView* columnArray, const unsigned int scale) { - int returnCode = 0; - // Convert to arrow double/float64 column - nanoarrow::UniqueSchema newUniqueField; - nanoarrow::UniqueArray newUniqueArray; - ArrowSchema* newSchema = newUniqueField.get(); - ArrowArray* newArray = newUniqueArray.get(); - - // create new schema - ArrowSchemaInit(newSchema); - newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); // map to nullable() - returnCode = ArrowSchemaSetTypeDecimal(newSchema, NANOARROW_TYPE_DECIMAL128, 38, scale); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "decimal, error code: %d", - returnCode); - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - - ArrowError error; - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing ArrowArrayView " - "from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - returnCode = ArrowArrayStartAppending(newArray); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error appending arrow array, error code: %d", - returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (ArrowArrayViewIsNull(columnArray, rowIdx)) { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to arrow " - "array, error code: %d", - returnCode); - } else { - auto originalVal = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); - ArrowDecimal arrowDecimal; - ArrowDecimalInit(&arrowDecimal, 128, 38, scale); - ArrowDecimalSetInt(&arrowDecimal, originalVal); - returnCode = ArrowArrayAppendDecimal(newArray, &arrowDecimal); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending decimal to " - "arrow array, error code: %d", - returnCode); - } - } - returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error finishing building arrow " - "array: %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - field->schema->release(field->schema); - ArrowSchemaMove(newSchema, field->schema); - columnArray->array->release(columnArray->array); - ArrowArrayMove(newArray, columnArray->array); -} - -void CArrowTableIterator::convertScaledFixedNumberColumnToDoubleColumn_nanoarrow( - ArrowSchemaView* field, ArrowArrayView* columnArray, const unsigned int scale) { - int returnCode = 0; - // Convert to arrow double/float64 column - nanoarrow::UniqueSchema newUniqueField; - nanoarrow::UniqueArray newUniqueArray; - ArrowSchema* newSchema = newUniqueField.get(); - ArrowArray* newArray = newUniqueArray.get(); - - // create new schema - ArrowSchemaInit(newSchema); - newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); // map to nullable() - returnCode = ArrowSchemaSetType(newSchema, NANOARROW_TYPE_DOUBLE); // map to arrow:float64() - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "double, error code: %d", - returnCode); - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - - ArrowError error; - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing ArrowArrayView " - "from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (ArrowArrayViewIsNull(columnArray, rowIdx)) { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to arrow " - "array, error code: %d", - returnCode); - } else { - auto originalVal = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); - double val = convertScaledFixedNumberToDouble(scale, originalVal); - returnCode = ArrowArrayAppendDouble(newArray, val); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending double to arrow " - "array, error code: %d", - returnCode); - } - } - returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error finishing building arrow " - "array: %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - field->schema->release(field->schema); - ArrowSchemaMove(newSchema, field->schema); - columnArray->array->release(columnArray->array); - ArrowArrayMove(newArray, columnArray->array); -} - -void CArrowTableIterator::convertIntervalDayTimeColumn_nanoarrow(ArrowSchemaView* field, - ArrowArrayView* columnArray, - const int scale) { - int returnCode = 0; - nanoarrow::UniqueSchema newUniqueField; - nanoarrow::UniqueArray newUniqueArray; - ArrowSchema* newSchema = newUniqueField.get(); - ArrowArray* newArray = newUniqueArray.get(); - ArrowError error; - - // create new schema - ArrowSchemaInit(newSchema); - newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); // map to nullable() - - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_DURATION, - NANOARROW_TIME_UNIT_NANO, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing ArrowArrayView " - "from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - returnCode = ArrowArrayStartAppending(newArray); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error appending arrow array, error code: %d", - returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (ArrowArrayViewIsNull(columnArray, rowIdx)) { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to arrow " - "array, error code: %d", - returnCode); - } else { - ArrowDecimal arrowDecimal; - ArrowDecimalInit(&arrowDecimal, 128, 38, 0); - ArrowArrayViewGetDecimalUnsafe(columnArray, rowIdx, &arrowDecimal); - auto originalVal = ArrowDecimalGetIntUnsafe(&arrowDecimal); - returnCode = ArrowArrayAppendInt(newArray, originalVal); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending int to arrow " - "array, error code: %d", - returnCode); - } - } - - returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error finishing building arrow " - "array: %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - field->schema->release(field->schema); - ArrowSchemaMove(newSchema, field->schema); - columnArray->array->release(columnArray->array); - ArrowArrayMove(newArray, columnArray->array); -} - -void CArrowTableIterator::convertTimeColumn_nanoarrow(ArrowSchemaView* field, - ArrowArrayView* columnArray, - const int scale) { - int returnCode = 0; - nanoarrow::UniqueSchema newUniqueField; - nanoarrow::UniqueArray newUniqueArray; - ArrowSchema* newSchema = newUniqueField.get(); - ArrowArray* newArray = newUniqueArray.get(); - ArrowError error; - - // create new schema - ArrowSchemaInit(newSchema); - int64_t powTenSB4Val = 1; - newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); // map to nullable() - if (scale == 0) { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIME32, - NANOARROW_TIME_UNIT_SECOND, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - } else if (scale <= 3) { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIME32, - NANOARROW_TIME_UNIT_MILLI, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - powTenSB4Val = sf::internal::powTenSB4[3 - scale]; - } else if (scale <= 6) { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIME64, - NANOARROW_TIME_UNIT_MICRO, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - powTenSB4Val = sf::internal::powTenSB4[6 - scale]; - } else { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIME64, - NANOARROW_TIME_UNIT_MICRO, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - powTenSB4Val = sf::internal::powTenSB4[scale - 6]; - } - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing ArrowArrayView " - "from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - returnCode = ArrowArrayStartAppending(newArray); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error appending arrow array, error code: %d", - returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (ArrowArrayViewIsNull(columnArray, rowIdx)) { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to arrow " - "array, error code: %d", - returnCode); - } else { - auto originalVal = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); - if (scale <= 6) { - originalVal *= powTenSB4Val; - } else { - originalVal /= powTenSB4Val; - } - returnCode = ArrowArrayAppendInt(newArray, originalVal); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending int to arrow " - "array, error code: %d", - returnCode); - } - } - - returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error finishing building arrow " - "array: %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - field->schema->release(field->schema); - ArrowSchemaMove(newSchema, field->schema); - columnArray->array->release(columnArray->array); - ArrowArrayMove(newArray, columnArray->array); -} - -/** - * Helper function to detect nanosecond timestamp overflow and determine if - * downscaling to microseconds is needed. - * @param columnArray The Arrow array containing the timestamp data - * @param epochArray The Arrow array containing epoch values - * @param fractionArray The Arrow array containing fraction values - * @return true if overflow was detected and downscaling to microseconds is - * safe, false otherwise - * @throws std::overflow_error if overflow is detected but downscaling would - * lose precision - */ -static bool _checkNanosecondTimestampOverflowAndDownscale(ArrowArrayView* columnArray, - ArrowArrayView* epochArray, - ArrowArrayView* fractionArray) { - int powTenSB4 = sf::internal::powTenSB4[9]; - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (!ArrowArrayViewIsNull(columnArray, rowIdx)) { - int64_t epoch = ArrowArrayViewGetIntUnsafe(epochArray, rowIdx); - int64_t fraction = ArrowArrayViewGetIntUnsafe(fractionArray, rowIdx); - if (epoch > (INT64_MAX / powTenSB4) || epoch < (INT64_MIN / powTenSB4)) { - if (fraction % 1000 != 0) { - std::string errorInfo = Logger::formatString( - "The total number of nanoseconds %d%d overflows int64 range. " - "If you use a timestamp with " - "the nanosecond part over 6-digits in the Snowflake database, " - "the timestamp must be " - "between '1677-09-21 00:12:43.145224192' and '2262-04-11 " - "23:47:16.854775807' to not overflow.", - epoch, fraction); - throw std::overflow_error(errorInfo.c_str()); - } else { - return true; // Safe to downscale - } - } - } - } - return false; -} - -void CArrowTableIterator::convertTimestampColumn_nanoarrow(ArrowSchemaView* field, - ArrowArrayView* columnArray, - const int scale, - const std::string timezone) { - int returnCode = 0; - nanoarrow::UniqueSchema newUniqueField; - nanoarrow::UniqueArray newUniqueArray; - ArrowSchema* newSchema = newUniqueField.get(); - ArrowArray* newArray = newUniqueArray.get(); - ArrowError error; - - ArrowSchemaInit(newSchema); - newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); // map to nullable() - - // Find epoch and fraction arrays for overflow detection - ArrowArrayView* epochArray = nullptr; - ArrowArrayView* fractionArray = nullptr; - // When m_force_microsecond_precision is true, always use microsecond - // precision to ensure consistent schema across all batches - bool has_overflow_to_downscale = m_force_microsecond_precision; - if (!m_force_microsecond_precision && scale > 6 && field->type == NANOARROW_TYPE_STRUCT) { - for (int64_t i = 0; i < field->schema->n_children; i++) { - ArrowSchema* c_schema = field->schema->children[i]; - if (std::strcmp(c_schema->name, internal::FIELD_NAME_EPOCH.c_str()) == 0) { - epochArray = columnArray->children[i]; - } else if (std::strcmp(c_schema->name, internal::FIELD_NAME_FRACTION.c_str()) == 0) { - fractionArray = columnArray->children[i]; - } else { - // do nothing - } - } - has_overflow_to_downscale = - _checkNanosecondTimestampOverflowAndDownscale(columnArray, epochArray, fractionArray); - } - - if (scale <= 6) { - int64_t powTenSB4Val = 1; - auto timeunit = NANOARROW_TIME_UNIT_SECOND; - if (scale == 0) { - timeunit = NANOARROW_TIME_UNIT_SECOND; - powTenSB4Val = 1; - } else if (scale <= 3) { - timeunit = NANOARROW_TIME_UNIT_MILLI; - powTenSB4Val = sf::internal::powTenSB4[3 - scale]; - } else if (scale <= 6) { - timeunit = NANOARROW_TIME_UNIT_MICRO; - powTenSB4Val = sf::internal::powTenSB4[6 - scale]; - } - if (!timezone.empty()) { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, - timezone.c_str()); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - } else { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - } - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing ArrowArrayView " - "from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (ArrowArrayViewIsNull(columnArray, rowIdx)) { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to arrow " - "array, error code: %d", - returnCode); - } else { - int64_t val = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); - val *= powTenSB4Val; - returnCode = ArrowArrayAppendInt(newArray, val); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending int to arrow " - "array, error code: %d", - returnCode); - } - } - } else { - int64_t val; - if (field->type == NANOARROW_TYPE_STRUCT) { - ArrowArrayView* epochArray = nullptr; - ArrowArrayView* fractionArray = nullptr; - for (int64_t i = 0; i < field->schema->n_children; i++) { - ArrowSchema* c_schema = field->schema->children[i]; - if (std::strcmp(c_schema->name, internal::FIELD_NAME_EPOCH.c_str()) == 0) { - epochArray = columnArray->children[i]; - } else if (std::strcmp(c_schema->name, internal::FIELD_NAME_FRACTION.c_str()) == 0) { - fractionArray = columnArray->children[i]; - } else { - // do nothing - } - } - - auto timeunit = - has_overflow_to_downscale ? NANOARROW_TIME_UNIT_MICRO : NANOARROW_TIME_UNIT_NANO; - if (!timezone.empty()) { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, - timezone.c_str()); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema " - "type DateTime, error code: %d", - returnCode); - } else { - returnCode = - ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema " - "type DateTime, error code: %d", - returnCode); - } - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing " - "ArrowArrayView from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (!ArrowArrayViewIsNull(columnArray, rowIdx)) { - int64_t epoch = ArrowArrayViewGetIntUnsafe(epochArray, rowIdx); - int64_t fraction = ArrowArrayViewGetIntUnsafe(fractionArray, rowIdx); - if (has_overflow_to_downscale) { - val = epoch * sf::internal::powTenSB4[6] + fraction / 1000; - } else { - val = epoch * sf::internal::powTenSB4[9] + fraction; - } - returnCode = ArrowArrayAppendInt(newArray, val); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending int to " - "arrow array, error code: %d", - returnCode); - } else { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to " - "arrow array, error code: %d", - returnCode); - } - } - } else if (field->type == NANOARROW_TYPE_INT64) { - auto timeunit = - has_overflow_to_downscale ? NANOARROW_TIME_UNIT_MICRO : NANOARROW_TIME_UNIT_NANO; - if (!timezone.empty()) { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, - timezone.c_str()); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema " - "type DateTime, error code: %d", - returnCode); - } else { - returnCode = - ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema " - "type DateTime, error code: %d", - returnCode); - } - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing " - "ArrowArrayView from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (!ArrowArrayViewIsNull(columnArray, rowIdx)) { - val = ArrowArrayViewGetIntUnsafe(columnArray, rowIdx); - val *= sf::internal::powTenSB4[9 - scale]; - returnCode = ArrowArrayAppendInt(newArray, val); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending int to " - "arrow array, error code: %d", - returnCode); - } else { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to " - "arrow array, error code: %d", - returnCode); - } - } - } - } - - returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error finishing building arrow " - "array: %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - field->schema->release(field->schema); - ArrowSchemaMove(newSchema, field->schema); - columnArray->array->release(columnArray->array); - ArrowArrayMove(newArray, columnArray->array); -} - -void CArrowTableIterator::convertTimestampTZColumn_nanoarrow(ArrowSchemaView* field, - ArrowArrayView* columnArray, - const int scale, const int byteLength, - const std::string timezone) { - int returnCode = 0; - nanoarrow::UniqueSchema newUniqueField; - nanoarrow::UniqueArray newUniqueArray; - ArrowSchema* newSchema = newUniqueField.get(); - ArrowArray* newArray = newUniqueArray.get(); - ArrowError error; - ArrowSchemaInit(newSchema); - newSchema->flags &= (field->schema->flags & ARROW_FLAG_NULLABLE); // map to nullable() - - // Find epoch and fraction arrays - ArrowArrayView* epochArray = nullptr; - ArrowArrayView* fractionArray = nullptr; - for (int64_t i = 0; i < field->schema->n_children; i++) { - ArrowSchema* c_schema = field->schema->children[i]; - if (std::strcmp(c_schema->name, internal::FIELD_NAME_EPOCH.c_str()) == 0) { - epochArray = columnArray->children[i]; - } else if (std::strcmp(c_schema->name, internal::FIELD_NAME_FRACTION.c_str()) == 0) { - fractionArray = columnArray->children[i]; - } else { - // do nothing - } - } - - // When m_force_microsecond_precision is true, always use microsecond - // precision to ensure consistent schema across all batches - bool has_overflow_to_downscale = m_force_microsecond_precision; - if (!m_force_microsecond_precision && scale > 6 && byteLength == 16) { - has_overflow_to_downscale = - _checkNanosecondTimestampOverflowAndDownscale(columnArray, epochArray, fractionArray); - } - - auto timeunit = NANOARROW_TIME_UNIT_SECOND; - if (scale == 0) { - timeunit = NANOARROW_TIME_UNIT_SECOND; - } else if (scale <= 3) { - timeunit = NANOARROW_TIME_UNIT_MILLI; - } else if (scale <= 6) { - timeunit = NANOARROW_TIME_UNIT_MICRO; - } else { - // Use microsecond precision if we detected overflow, otherwise nanosecond - timeunit = has_overflow_to_downscale ? NANOARROW_TIME_UNIT_MICRO : NANOARROW_TIME_UNIT_NANO; - } - - if (!timezone.empty()) { - returnCode = - ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, timezone.c_str()); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - } else { - returnCode = ArrowSchemaSetTypeDateTime(newSchema, NANOARROW_TYPE_TIMESTAMP, timeunit, NULL); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error setting arrow schema type " - "DateTime, error code: %d", - returnCode); - } - returnCode = ArrowSchemaSetName(newSchema, field->schema->name); - SF_CHECK_ARROW_RC(returnCode, "[Snowflake Exception] error setting schema name, error code: %d", - returnCode); - - returnCode = ArrowArrayInitFromSchema(newArray, newSchema, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error initializing ArrowArrayView " - "from schema : %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - - for (int64_t rowIdx = 0; rowIdx < columnArray->array->length; rowIdx++) { - if (!ArrowArrayViewIsNull(columnArray, rowIdx)) { - if (byteLength == 8) { - int64_t epoch = ArrowArrayViewGetIntUnsafe(epochArray, rowIdx); - if (scale == 0) { - returnCode = ArrowArrayAppendInt(newArray, epoch); - } else if (scale <= 3) { - returnCode = ArrowArrayAppendInt(newArray, epoch * sf::internal::powTenSB4[3 - scale]); - } else if (scale <= 6) { - returnCode = ArrowArrayAppendInt(newArray, epoch * sf::internal::powTenSB4[6 - scale]); - } else { - // Handle overflow by falling back to microsecond precision - if (has_overflow_to_downscale) { - returnCode = ArrowArrayAppendInt(newArray, epoch * sf::internal::powTenSB4[6]); - } else { - returnCode = ArrowArrayAppendInt(newArray, epoch * sf::internal::powTenSB4[9 - scale]); - } - } - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending int to " - "arrow array, error code: %d", - returnCode); - } else if (byteLength == 16) { - int64_t epoch = ArrowArrayViewGetIntUnsafe(epochArray, rowIdx); - int64_t fraction = ArrowArrayViewGetIntUnsafe(fractionArray, rowIdx); - if (scale == 0) { - returnCode = ArrowArrayAppendInt(newArray, epoch); - } else if (scale <= 3) { - returnCode = ArrowArrayAppendInt(newArray, epoch * sf::internal::powTenSB4[3 - scale] + - fraction / sf::internal::powTenSB4[6]); - } else if (scale <= 6) { - returnCode = ArrowArrayAppendInt( - newArray, epoch * sf::internal::powTenSB4[6] + fraction / sf::internal::powTenSB4[3]); - } else { - // Handle overflow by falling back to microsecond precision - if (has_overflow_to_downscale) { - returnCode = - ArrowArrayAppendInt(newArray, epoch * sf::internal::powTenSB4[6] + fraction / 1000); - } else { - returnCode = - ArrowArrayAppendInt(newArray, epoch * sf::internal::powTenSB4[9] + fraction); - } - } - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending int to " - "arrow array, error code: %d", - returnCode); - } else { - std::string errorInfo = Logger::formatString( - "[Snowflake Exception] unknown arrow internal data type(%d) " - "for TIMESTAMP_TZ data", - NANOARROW_TYPE_ENUM_STRING[field->type]); - logger->error(__FILE__, __func__, __LINE__, errorInfo.c_str()); - PyErr_SetString(PyExc_Exception, errorInfo.c_str()); - return; - } - } else { - returnCode = ArrowArrayAppendNull(newArray, 1); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error appending null to arrow " - "array, error code: %d", - returnCode); - } - } - - returnCode = ArrowArrayFinishBuildingDefault(newArray, &error); - SF_CHECK_ARROW_RC(returnCode, - "[Snowflake Exception] error finishing building arrow " - "array: %s, error code: %d", - ArrowErrorMessage(&error), returnCode); - field->schema->release(field->schema); - ArrowSchemaMove(newSchema, field->schema); - columnArray->array->release(columnArray->array); - ArrowArrayMove(newArray, columnArray->array); -} - -bool CArrowTableIterator::convertRecordBatchesToTable_nanoarrow() { - // only do conversion once and there exist some record batches - if (!m_tableConverted && m_ipcArrowArrayViewVec.size() > 0) { - reconstructRecordBatches_nanoarrow(); - return true; - } - return false; -} - -std::vector CArrowTableIterator::getArrowArrayPtrs() { - std::vector ret; - for (size_t i = 0; i < m_ipcArrowArrayVec.size(); i++) { - ret.push_back((uintptr_t)(void*)(m_ipcArrowArrayVec[i].get())); - } - return ret; -} - -std::vector CArrowTableIterator::getArrowSchemaPtrs() { - std::vector ret; - for (size_t i = 0; i < m_ipcSchemaArrayVec.size(); i++) { - ret.push_back((uintptr_t)(void*)(m_ipcSchemaArrayVec[i].get())); - } - return ret; -} - -} // namespace sf diff --git a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowTableIterator.hpp b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowTableIterator.hpp deleted file mode 100644 index 44edd3edf..000000000 --- a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/CArrowTableIterator.hpp +++ /dev/null @@ -1,142 +0,0 @@ -#ifndef PC_ARROWTABLEITERATOR_HPP -#define PC_ARROWTABLEITERATOR_HPP - -#include -#include -#include - -#include "CArrowIterator.hpp" -#include "nanoarrow.h" -#include "nanoarrow.hpp" - -namespace sf { - -/** - * Arrow table iterator implementation in C++. - * The caller will ask for an Arrow Table to be returned back to Python - * This conversion is zero-copy, just aggregate every columns from multiple - * record batches and build a new table. - */ -class CArrowTableIterator : public CArrowIterator { - public: - /** - * Constructor - * - * @param context Python context object for conversions - * @param arrow_bytes Arrow IPC bytes - * @param arrow_bytes_size Size of the arrow bytes - * @param number_to_decimal Whether to convert numbers to decimal - * @param force_microsecond_precision When true, all timestamp columns are converted - * to microsecond precision, ensuring consistent schema across all batches. - * This is useful when your data contains timestamps outside the nanosecond - * range (1677-2262), such as '9999-12-31' or '0001-01-01'. When false - * (default), precision is determined per-batch based on the data, which - * may cause pyarrow schema mismatch errors when combining batches. - * Note: enabling this truncates sub-microsecond precision (scale 7-9). - */ - CArrowTableIterator(PyObject* context, char* arrow_bytes, int64_t arrow_bytes_size, - bool number_to_decimal, bool force_microsecond_precision = false); - - /** - * Destructor - */ - ~CArrowTableIterator() = default; - - /** - * @return an arrow table containing all data in all record batches - */ - ReturnVal next() override; - std::vector getArrowArrayPtrs() override; - std::vector getArrowSchemaPtrs() override; - - private: - // nanoarrow data - std::vector m_ipcSchemaArrayVec; - - bool m_tableConverted = false; - - /** arrow format convert context for the current session */ - PyObject* m_context; - - /** local time zone */ - char* m_timezone; - const bool m_convert_number_to_decimal; - /** force microsecond precision for timestamps to ensure consistent schema between batches */ - const bool m_force_microsecond_precision; - - /** - * Reconstruct record batches with type conversion in place - */ - void reconstructRecordBatches(); - void convertIfNeeded(ArrowSchema* columnSchema, ArrowArrayView* columnArray); - void reconstructRecordBatches_nanoarrow(); - - /** - * Convert all current RecordBatches to Arrow Table - * @return if conversion is executed at first time and successfully - */ - bool convertRecordBatchesToTable_nanoarrow(); - - /** - * convert scaled fixed number column to Decimal, or Double column based on - * setting - */ - void convertScaledFixedNumberColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, - const unsigned int scale); - - /** - * convert scaled fixed number column to Decimal column - */ - void convertScaledFixedNumberColumnToDecimalColumn_nanoarrow(ArrowSchemaView* field, - ArrowArrayView* columnArray, - const unsigned int scale); - - /** - * convert scaled fixed number column to Double column - */ - void convertScaledFixedNumberColumnToDoubleColumn_nanoarrow(ArrowSchemaView* field, - ArrowArrayView* columnArray, - const unsigned int scale); - - /** - * convert Snowflake Time column (Arrow int32/int64) to Arrow Time column - * Since Python/Pandas Time does not support nanoseconds, this function - * truncates values to microseconds if necessary - */ - void convertTimeColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, - const int scale); - - /** - * convert Snowflake Interval Day-Time column (Arrow int64/decimal128) to - * Arrow Duration column - */ - void convertIntervalDayTimeColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, - const int scale); - - /** - * convert Snowflake TimestampNTZ/TimestampLTZ column to Arrow Timestamp - * column - */ - void convertTimestampColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, - const int scale, const std::string timezone = ""); - - /** - * convert Snowflake TimestampTZ column to Arrow Timestamp column in UTC - * Arrow Timestamp does not support time zone info in each value, so this - * method convert TimestampTZ to Arrow timestamp with UTC timezone - */ - void convertTimestampTZColumn_nanoarrow(ArrowSchemaView* field, ArrowArrayView* columnArray, - const int scale, const int byteLength, - const std::string timezone); - - /** - * convert scaled fixed number to double - * if scale is small, then just divide based on the scale; otherwise, convert - * the value to string first and then convert to double to avoid precision - * loss - */ - template - double convertScaledFixedNumberToDouble(const unsigned int scale, T originalValue); -}; -} // namespace sf -#endif // PC_ARROWTABLEITERATOR_HPP diff --git a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/arrow_stream_iterator.pyx b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/arrow_stream_iterator.pyx index c8e8f8426..bdbda47c3 100644 --- a/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/arrow_stream_iterator.pyx +++ b/python/src/snowflake/connector/_internal/nanoarrow_cpp/ArrowIterator/arrow_stream_iterator.pyx @@ -2,9 +2,14 @@ # cython: language_level=3 from cpython.ref cimport PyObject -from libc.stdint cimport int64_t +from libc.stdint cimport int64_t, uintptr_t from libcpp.memory cimport unique_ptr +try: + import pyarrow as pa +except ImportError: + pass + # Import ReturnVal from C++ (defined in CArrowIterator.hpp) cdef extern from "CArrowIterator.hpp" namespace "sf": @@ -26,17 +31,32 @@ cdef extern from "CArrowStreamIterator.hpp" namespace "sf": ReturnVal next() +cdef extern from "CArrowStreamTableIterator.hpp" namespace "sf": + cdef cppclass CArrowStreamTableIterator: + @staticmethod + unique_ptr[CArrowStreamTableIterator] from_stream( + int64_t stream_ptr, + PyObject* context, + bint number_to_decimal, + bint force_microsecond_precision, + ) + ReturnVal next() + uintptr_t getArrowArrayPtr() + uintptr_t getArrowSchemaPtr() + uintptr_t getConvertedSchemaPtr() + + cdef class ArrowStreamIterator: """ Python wrapper for C++ Arrow stream iterator. - + Reads directly from an ArrowArrayStream pointer. The C++ implementation uses Py_BEGIN_ALLOW_THREADS/Py_END_ALLOW_THREADS to release the GIL during potentially blocking I/O operations (e.g., fetching data chunks from S3). """ cdef unique_ptr[CArrowStreamIterator] iterator cdef object arrow_context - + def __cinit__( self, int64_t stream_ptr, @@ -46,7 +66,7 @@ cdef class ArrowStreamIterator: ): """ Initialize the stream iterator. - + Parameters ---------- stream_ptr : int @@ -59,7 +79,7 @@ cdef class ArrowStreamIterator: If True, use numpy types for numeric data """ self.arrow_context = arrow_context - + # Create the C++ stream iterator using factory method self.iterator = CArrowStreamIterator.from_stream( stream_ptr, @@ -67,29 +87,86 @@ cdef class ArrowStreamIterator: use_numpy, use_dict_result ) - + # Check if creation failed (nullptr returned) if self.iterator.get() == NULL: raise RuntimeError("Failed to initialize stream iterator") - + def __iter__(self): return self - + def __next__(self): """Get next row from stream.""" cdef ReturnVal ret - + ret = self.iterator.get().next() - + # Check for exception if ret.exception != NULL: error_msg = ret.exception raise RuntimeError(f"Error converting row: {error_msg}") - + # Check for end of iteration if ret.successObj == NULL: raise StopIteration - + # Return the row row = ret.successObj return row + +cdef class ArrowStreamTableIterator: + """ + Python iterator that reads batches from an ArrowArrayStream and yields + one pyarrow.RecordBatch per batch, with Snowflake type conversions + applied via ArrowTableConverter. + """ + cdef unique_ptr[CArrowStreamTableIterator] iterator + cdef object arrow_context + + def __cinit__( + self, + int64_t stream_ptr, + object arrow_context, + bint number_to_decimal = False, + bint force_microsecond_precision = False, + ): + self.arrow_context = arrow_context + + self.iterator = CArrowStreamTableIterator.from_stream( + stream_ptr, + arrow_context, + number_to_decimal, + force_microsecond_precision, + ) + if self.iterator.get() == NULL: + raise RuntimeError("Failed to initialize ArrowStreamTableIterator") + + def __iter__(self): + return self + + def __next__(self): + cdef ReturnVal ret + cdef uintptr_t array_ptr + cdef uintptr_t schema_ptr + + ret = self.iterator.get().next() + + if ret.exception != NULL: + error_msg = ret.exception + raise RuntimeError(f"Error in dummy iterator: {error_msg}") + + if ret.successObj == NULL: + raise StopIteration + + if ret.successObj is None: + raise StopIteration + + array_ptr = self.iterator.get().getArrowArrayPtr() + schema_ptr = self.iterator.get().getArrowSchemaPtr() + + return pa.RecordBatch._import_from_c(array_ptr, schema_ptr) + + def get_converted_schema(self): + cdef uintptr_t schema_ptr = self.iterator.get().getConvertedSchemaPtr() + + return pa.Schema._import_from_c(schema_ptr) diff --git a/python/src/snowflake/connector/connection.py b/python/src/snowflake/connector/connection.py index 04d51f6e9..10240d8bb 100644 --- a/python/src/snowflake/connector/connection.py +++ b/python/src/snowflake/connector/connection.py @@ -140,6 +140,9 @@ def __init__( self._messages: list[tuple[type[Exception], dict[str, str | bool]]] = [] self._errorhandler: Callable + # other connection properties + self._arrow_number_to_decimal: bool = False + @pep249 def close(self) -> None: """Close the connection now.""" @@ -523,11 +526,21 @@ def use_openssl_only(self) -> bool: @property def arrow_number_to_decimal(self) -> bool: """Whether to convert Arrow numeric types to Python ``Decimal`` instead of ``float``.""" - raise NotImplementedError("arrow_number_to_decimal is not yet implemented") + return self._arrow_number_to_decimal @arrow_number_to_decimal.setter def arrow_number_to_decimal(self, value: bool) -> None: - raise NotImplementedError("arrow_number_to_decimal is not yet implemented") + self._arrow_number_to_decimal = bool(value) + + @backward_compatibility + @arrow_number_to_decimal.setter # type: ignore[attr-defined, untyped-decorator] + def arrow_number_to_decimal_setter(self, value: bool) -> None: + """Set arrow_number_to_decimal field. Deprecated. + + Allows setting this field through `cursor.connection.arrow_number_to_decimal_setter = True`. + Added only because of backwards compatibility, correct setter should be used. + """ + self.arrow_number_to_decimal = value @property def validate_default_parameters(self) -> bool: diff --git a/python/src/snowflake/connector/cursor.py b/python/src/snowflake/connector/cursor.py index fdf1379f2..25911c598 100644 --- a/python/src/snowflake/connector/cursor.py +++ b/python/src/snowflake/connector/cursor.py @@ -13,6 +13,7 @@ import abc import ctypes +import enum import functools from collections.abc import Iterator, Sequence @@ -21,7 +22,7 @@ from snowflake.connector._internal.errorcode import ER_CONNECTION_IS_CLOSED, ER_CURSOR_IS_CLOSED from ._internal.arrow_context import ArrowConverterContext -from ._internal.arrow_stream_iterator import ArrowStreamIterator +from ._internal.arrow_stream_iterator import ArrowStreamIterator, ArrowStreamTableIterator from ._internal.binding_converters import ( ClientSideBindingConverter, JsonBindingConverter, @@ -38,13 +39,13 @@ StatementReleaseRequest, StatementSetSqlQueryRequest, ) -from ._internal.type_codes import get_type_code +from ._internal.type_codes import FIXED, get_type_code from .errors import InterfaceError, NotSupportedError, ProgrammingError if TYPE_CHECKING: from pandas import DataFrame - from pyarrow import Table + from pyarrow import Schema, Table from .connection import Connection @@ -54,31 +55,6 @@ F = TypeVar("F", bound=Callable[..., Any]) -def _requires_not_closed(func: F) -> F: - - @functools.wraps(func) - def wrapper(self: SnowflakeCursorBase, *args: Any, **kwargs: Any) -> Any: - if self._closed: - raise InterfaceError("Cursor is closed.", errno=ER_CURSOR_IS_CLOSED) - - return func(self, *args, **kwargs) - - return cast(F, wrapper) - - -def _requires_open_connection(func: F) -> F: - """Raise InterfaceError if the underlying connection is closed.""" - - @functools.wraps(func) - def wrapper(self: SnowflakeCursorBase, *args: Any, **kwargs: Any) -> Any: - if self.connection.is_closed(): - raise InterfaceError("Connection is closed.", errno=ER_CONNECTION_IS_CLOSED) - - return func(self, *args, **kwargs) - - return cast(F, wrapper) - - class ResultMetadata(NamedTuple): """PEP 249 column description entry. @@ -122,6 +98,64 @@ def from_column(cls, col: Any) -> ResultMetadata: ResultMetadataV2 = ResultMetadata +class FetchMode(enum.Enum): + """Distinguishes row-by-row fetching from Arrow/Pandas fetching. + + Once a cursor begins consuming results with one mode, switching to + the other is disallowed until a new ``execute()`` resets state. + """ + + ROW = "row" + ARROW = "arrow" + + +def _requires_not_closed(func: F) -> F: + + @functools.wraps(func) + def wrapper(self: SnowflakeCursorBase, *args: Any, **kwargs: Any) -> Any: + if self._closed: + raise InterfaceError("Cursor is closed.", errno=ER_CURSOR_IS_CLOSED) + + return func(self, *args, **kwargs) + + return cast(F, wrapper) + + +def _requires_open_connection(func: F) -> F: + """Raise InterfaceError if the underlying connection is closed.""" + + @functools.wraps(func) + def wrapper(self: SnowflakeCursorBase, *args: Any, **kwargs: Any) -> Any: + if self.connection.is_closed(): + raise InterfaceError("Connection is closed.", errno=ER_CONNECTION_IS_CLOSED) + + return func(self, *args, **kwargs) + + return cast(F, wrapper) + + +def _requires_fetch_mode(mode: FetchMode) -> Callable[[F], F]: + """Validate and lock the cursor's fetch mode before entering the wrapped method.""" + + def decorator(func: F) -> F: + @functools.wraps(func) + def wrapper(self: SnowflakeCursorBase, *args: Any, **kwargs: Any) -> Any: + if self._fetch_mode and self._fetch_mode != mode: + if mode == FetchMode.ARROW: + raise ProgrammingError("Cannot use arrow/pandas fetch methods after row-by-row fetching") + elif mode == FetchMode.ROW: + raise ProgrammingError("Cannot use row-by-row fetch methods after arrow/pandas fetching") + else: + raise ProgrammingError(f"Unexpected fetch mode: {mode}") + self._fetch_mode = mode + + return func(self, *args, **kwargs) + + return cast(F, wrapper) + + return decorator + + class SnowflakeCursorBase(abc.ABC): """ Base cursor class for database operations (PEP 249). @@ -145,11 +179,9 @@ def __init__(self, connection: Connection) -> None: self._sqlstate: str | None = None self._closed = False # Streaming state for Arrow results - self._reader = None - self._current_batch = None - self._current_row_in_batch = 0 self.execute_result: ExecuteResult | None = None self._iterator: Iterator[Row] | None = None + self._fetch_mode: FetchMode | None = None # Query bindings - keep binding data reference to prevent garbage collection while Rust uses it self._binding_data: None | bytes = None self._messages: list[tuple[type[Exception], dict[str, str | bool]]] = [] @@ -394,6 +426,7 @@ def execute( # Reset streaming state for a new result self._binding_data = None self._iterator = None + self._fetch_mode = None self._rownumber = -1 # Populate description, rowcount, and sqlstate @@ -535,11 +568,25 @@ def _get_iterator(self) -> ArrowStreamIterator: use_numpy=False, ) + def _get_table_iterator( + self, + force_microsecond_precision: bool = False, + ) -> ArrowStreamTableIterator: + stream_ptr = self._get_stream_ptr() + arrow_context = ArrowConverterContext() + return ArrowStreamTableIterator( + stream_ptr, + arrow_context, + number_to_decimal=self._connection.arrow_number_to_decimal, + force_microsecond_precision=force_microsecond_precision, + ) + # ------------------------------------------------------------------ # Fetch – shared implementation # ------------------------------------------------------------------ @_requires_not_closed + @_requires_fetch_mode(FetchMode.ROW) def _fetchone(self) -> Row | DictRow | None: """Fetch the next row internally. @@ -592,6 +639,7 @@ def fetchmany(self, size: int | None = None) -> list[Any]: @pep249 @_requires_not_closed + @_requires_fetch_mode(FetchMode.ROW) def fetchall(self) -> list[Any]: """ Fetch all (remaining) rows of a query result. @@ -808,32 +856,74 @@ def query_result(self, qid: str) -> SnowflakeCursorBase: """Query the result of a previously executed query.""" raise NotImplementedError("query_result is not yet implemented") + @_requires_not_closed @requires_dependency(pyarrow) + @_requires_fetch_mode(FetchMode.ARROW) def fetch_arrow_batches( self, force_microsecond_precision: bool = False, ) -> Iterator[Table]: """Fetch Arrow Tables in batches.""" - raise NotImplementedError("fetch_arrow_batches is not yet implemented") + iterator = self._get_table_iterator( + force_microsecond_precision=force_microsecond_precision, + ) + for batch in iterator: + yield pyarrow.Table.from_batches([batch]) + @_requires_not_closed @requires_dependency(pyarrow) + @_requires_fetch_mode(FetchMode.ARROW) def fetch_arrow_all( self, force_return_table: bool = False, force_microsecond_precision: bool = False, ) -> Table | None: """Fetch all results as a single Arrow Table.""" - raise NotImplementedError("fetch_arrow_all is not yet implemented") + iterator = self._get_table_iterator( + force_microsecond_precision=force_microsecond_precision, + ) + batches = list(iterator) + if not batches: + if force_return_table: + schema = iterator.get_converted_schema() + normalized_schema = self._normalize_fixed_column_types(schema) + return normalized_schema.empty_table() + return None + return pyarrow.Table.from_batches(batches) + + def _normalize_fixed_column_types(self, schema: Schema) -> Schema: + """Rewrite FIXED columns in an empty-result schema to int64 for backward compatibility. + When the result set has zero rows, the core chooses a narrower integer type + (i.e. int8) for NUMBER or float64 for SCALED NUMBER. + The old driver always exposed int64 in this case. + We use cursor.description (which is populated from query metadata) to identify FIXED columns and cast them. + """ + if not self._description: + return schema + + new_fields = [] + changed = False + for field, metadata in zip(schema, self._description): + if metadata.type_code == FIXED and field.type != pyarrow.int64(): + new_fields.append(field.with_type(pyarrow.int64())) + changed = True + else: + new_fields.append(field) + return pyarrow.schema(new_fields) if changed else schema + @_requires_not_closed @requires_dependency(pandas) def fetch_pandas_batches(self, **kwargs: Any) -> Iterator[DataFrame]: """Fetch Pandas DataFrames in batches.""" - raise NotImplementedError("fetch_pandas_batches is not yet implemented") + for table in self.fetch_arrow_batches(**kwargs): + yield table.to_pandas() + @_requires_not_closed @requires_dependency(pandas) def fetch_pandas_all(self, **kwargs: Any) -> DataFrame: """Fetch all results as a single Pandas DataFrame.""" - raise NotImplementedError("fetch_pandas_all is not yet implemented") + table: Table = self.fetch_arrow_all(force_return_table=True, **kwargs) + return table.to_pandas() def abort_query(self, qid: str) -> bool: """Abort a running query.""" diff --git a/python/tests/e2e/pandas/test_fetch_arrow.py b/python/tests/e2e/pandas/test_fetch_arrow.py index 8ca43db40..1790bffff 100644 --- a/python/tests/e2e/pandas/test_fetch_arrow.py +++ b/python/tests/e2e/pandas/test_fetch_arrow.py @@ -20,7 +20,7 @@ LARGE_RESULT_SET_ROW_COUNT = 100_000 ARROW_TYPE_CASES = [ - # (type_name, value_expr, null_expr, value_check, arrow_type_check) + # (type_name, value_expr, null_expr, value_check, empty_column_arrow_type_check) ("number", "1::NUMBER", "NULL::NUMBER", lambda v: v == 1, pa.types.is_int64), ("scaled_number", "3.14::NUMBER(10,2)", "NULL::NUMBER(10,2)", lambda v: abs(v - 3.14) < 0.01, pa.types.is_int64), ("varchar", "'hello'::VARCHAR", "NULL::VARCHAR", lambda v: v == "hello", pa.types.is_string), @@ -62,17 +62,16 @@ ] -@pytest.mark.skip_universal("SNOW-3243341 - not implemented yet") class TestFetchArrowAll: """Tests for fetch_arrow_all cursor method.""" @pytest.mark.parametrize( - "type_name,value_expr,null_expr,check,_arrow_type_check", + "type_name,value_expr,null_expr,check,_empty_column_arrow_type_check", ARROW_TYPE_CASES, ids=[c[0] for c in ARROW_TYPE_CASES], ) def test_should_fetch_type_name_with_null_as_pyarrow_table( - self, execute_query, cursor, type_name, value_expr, null_expr, check, _arrow_type_check + self, execute_query, cursor, type_name, value_expr, null_expr, check, _empty_column_arrow_type_check ): # Given Snowflake client is logged in assert_connection_is_open(execute_query) @@ -110,12 +109,12 @@ def test_should_return_none_from_fetch_arrow_all_for_empty_result_set(self, exec assert result is None @pytest.mark.parametrize( - "type_name,value_expr,_null_expr,_check,type_check", + "type_name,value_expr,_null_expr,_check,empty_column_arrow_type_check", ARROW_TYPE_CASES, ids=[c[0] for c in ARROW_TYPE_CASES], ) def test_should_return_empty_type_name_column_with_correct_arrow_type_when_force_return_table_is_true( - self, execute_query, cursor, type_name, value_expr, _null_expr, _check, type_check + self, execute_query, cursor, type_name, value_expr, _null_expr, _check, empty_column_arrow_type_check ): # Given Snowflake client is logged in assert_connection_is_open(execute_query) @@ -131,7 +130,7 @@ def test_should_return_empty_type_name_column_with_correct_arrow_type_when_force assert result.num_rows == 0 # And Column COL should have Arrow type - assert type_check(result.schema.field("COL").type) + assert empty_column_arrow_type_check(result.schema.field("COL").type) def test_should_convert_scaled_fixed_number_to_decimal_via_fetch_arrow_all(self, execute_query, cursor): # Given Snowflake client is logged in @@ -174,7 +173,6 @@ def test_should_force_microsecond_precision_for_timestamps_via_fetch_arrow_all(s assert ts_val.microsecond == 123456 -@pytest.mark.skip_universal("SNOW-3243341 - not implemented yet") class TestFetchArrowBatches: """Tests for fetch_arrow_batches cursor method.""" diff --git a/python/tests/e2e/pandas/test_fetch_pandas.py b/python/tests/e2e/pandas/test_fetch_pandas.py index d92fd1b29..b476b4b08 100644 --- a/python/tests/e2e/pandas/test_fetch_pandas.py +++ b/python/tests/e2e/pandas/test_fetch_pandas.py @@ -22,49 +22,88 @@ LARGE_RESULT_SET_ROW_COUNT = 100_000 PANDAS_TYPE_CASES = [ - ("number", "1::NUMBER", "NULL::NUMBER", lambda v: v == 1), - ("scaled_number", "3.14::NUMBER(10,2)", "NULL::NUMBER(10,2)", lambda v: abs(v - 3.14) < 0.01), - ("varchar", "'hello'::VARCHAR", "NULL::VARCHAR", lambda v: v == "hello"), - ("float", "1.5::FLOAT", "NULL::FLOAT", lambda v: abs(v - 1.5) < 0.01), - ("boolean", "TRUE::BOOLEAN", "NULL::BOOLEAN", lambda v: v), - ("date", "'2026-03-23'::DATE", "NULL::DATE", lambda v: (v.year, v.month, v.day) == (2026, 3, 23)), - ("time", "'12:30:00'::TIME", "NULL::TIME", lambda v: isinstance(v, time) and v == time(12, 30, 0)), + # (type_name, value_expr, null_expr, value_check, empty_column_dtype_check) + ("number", "1::NUMBER", "NULL::NUMBER", lambda v: v == 1, pd.api.types.is_int64_dtype), + ( + "scaled_number", + "3.14::NUMBER(10,2)", + "NULL::NUMBER(10,2)", + lambda v: abs(v - 3.14) < 0.01, + pd.api.types.is_int64_dtype, + ), + ("varchar", "'hello'::VARCHAR", "NULL::VARCHAR", lambda v: v == "hello", pd.api.types.is_string_dtype), + ("float", "1.5::FLOAT", "NULL::FLOAT", lambda v: abs(v - 1.5) < 0.01, pd.api.types.is_float_dtype), + ("boolean", "TRUE::BOOLEAN", "NULL::BOOLEAN", lambda v: v, pd.api.types.is_bool_dtype), + ( + "date", + "'2026-03-23'::DATE", + "NULL::DATE", + lambda v: (v.year, v.month, v.day) == (2026, 3, 23), + pd.api.types.is_object_dtype, + ), + ( + "time", + "'12:30:00'::TIME", + "NULL::TIME", + lambda v: isinstance(v, time) and v == time(12, 30, 0), + pd.api.types.is_object_dtype, + ), ( "timestamp_ntz", "'2026-03-23 10:30:00'::TIMESTAMP_NTZ", "NULL::TIMESTAMP_NTZ", lambda v: isinstance(v, pd.Timestamp) and (v.year, v.month, v.day, v.hour, v.minute) == (2026, 3, 23, 10, 30), + pd.api.types.is_datetime64_any_dtype, ), ( "timestamp_ltz", "'2026-03-23 10:30:00'::TIMESTAMP_LTZ", "NULL::TIMESTAMP_LTZ", lambda v: isinstance(v, pd.Timestamp) and (v.year, v.month, v.day, v.hour, v.minute) == (2026, 3, 23, 10, 30), + pd.api.types.is_datetime64_any_dtype, ), ( "timestamp_tz", "'2026-03-23 10:30:00 +0530'::TIMESTAMP_TZ", "NULL::TIMESTAMP_TZ", lambda v: isinstance(v, pd.Timestamp) and (v.year, v.month, v.day, v.hour, v.minute) == (2026, 3, 23, 5, 0), + pd.api.types.is_datetime64_any_dtype, + ), + ( + "binary", + "TO_BINARY('ABCD','HEX')::BINARY", + "NULL::BINARY", + lambda v: v == b"\xab\xcd", + pd.api.types.is_string_dtype, + ), + ("variant", "TO_VARIANT(42)", "NULL::VARIANT", lambda v: json.loads(v) == 42, pd.api.types.is_string_dtype), + ( + "array", + "ARRAY_CONSTRUCT(1,2,3)::ARRAY", + "NULL::ARRAY", + lambda v: json.loads(v) == [1, 2, 3], + pd.api.types.is_string_dtype, + ), + ( + "object", + "OBJECT_CONSTRUCT('key','value')::OBJECT", + "NULL::OBJECT", + lambda v: json.loads(v) == {"key": "value"}, + pd.api.types.is_string_dtype, ), - ("binary", "TO_BINARY('ABCD','HEX')::BINARY", "NULL::BINARY", lambda v: v == b"\xab\xcd"), - ("variant", "TO_VARIANT(42)", "NULL::VARIANT", lambda v: json.loads(v) == 42), - ("array", "ARRAY_CONSTRUCT(1,2,3)::ARRAY", "NULL::ARRAY", lambda v: json.loads(v) == [1, 2, 3]), - ("object", "OBJECT_CONSTRUCT('key','value')::OBJECT", "NULL::OBJECT", lambda v: json.loads(v) == {"key": "value"}), ] -@pytest.mark.skip_universal("SNOW-3243341 - not implemented yet") class TestFetchPandasAll: """Tests for fetch_pandas_all cursor method.""" @pytest.mark.parametrize( - "type_name,value_expr,null_expr,check", + "type_name,value_expr,null_expr,check,_empty_column_dtype_check", PANDAS_TYPE_CASES, ids=[c[0] for c in PANDAS_TYPE_CASES], ) def test_should_fetch_type_name_with_null_as_pandas_dataframe( - self, execute_query, cursor, type_name, value_expr, null_expr, check + self, execute_query, cursor, type_name, value_expr, null_expr, check, _empty_column_dtype_check ): # Given Snowflake client is logged in assert_connection_is_open(execute_query) @@ -89,12 +128,19 @@ def test_should_fetch_type_name_with_null_as_pandas_dataframe( # And Column VAL should have the correct value for assert check(result["VAL"].iloc[0]) - def test_should_return_empty_pandas_dataframe_for_empty_result_set(self, execute_query, cursor): + @pytest.mark.parametrize( + "type_name,value_expr,_null_expr,_check,empty_column_dtype_check", + PANDAS_TYPE_CASES, + ids=[c[0] for c in PANDAS_TYPE_CASES], + ) + def test_should_return_empty_type_name_column_with_correct_pandas_dtype( + self, execute_query, cursor, type_name, value_expr, _null_expr, _check, empty_column_dtype_check + ): # Given Snowflake client is logged in assert_connection_is_open(execute_query) - # When Query "SELECT 1 AS id WHERE 1=0" is executed - cursor.execute("SELECT 1 AS id WHERE 1=0") + # When Query "SELECT AS col WHERE 1=0" is executed + cursor.execute(f"SELECT {value_expr} AS col WHERE 1=0") # And fetch_pandas_all is called result: pd.DataFrame = cursor.fetch_pandas_all() @@ -103,6 +149,9 @@ def test_should_return_empty_pandas_dataframe_for_empty_result_set(self, execute assert isinstance(result, pd.DataFrame) assert len(result) == 0 + # And Column COL should have pandas dtype + assert empty_column_dtype_check(result["COL"].dtype) + def test_should_convert_scaled_fixed_number_to_decimal_via_fetch_pandas_all(self, execute_query, cursor): # Given Snowflake client is logged in assert_connection_is_open(execute_query) @@ -145,7 +194,6 @@ def test_should_force_microsecond_precision_for_timestamps_via_fetch_pandas_all( assert ts_val.microsecond == 123456 -@pytest.mark.skip_universal("SNOW-3243341 - not implemented yet") class TestFetchPandasBatches: """Tests for fetch_pandas_batches cursor method.""" diff --git a/python/tests/unit/test_connection.py b/python/tests/unit/test_connection.py index c167d6cb9..cd95f16f2 100644 --- a/python/tests/unit/test_connection.py +++ b/python/tests/unit/test_connection.py @@ -413,3 +413,30 @@ def test_is_an_error(self, status, expected): from snowflake.connector.connection import Connection assert Connection.is_an_error(status) == expected + + +class TestConnectionArrowProperties: + """Unit tests for Connection properties (getters/setters).""" + + def test_arrow_number_to_decimal_default_is_false(self, connection): + assert connection.arrow_number_to_decimal is False + + def test_arrow_number_to_decimal_setter_enables(self, connection): + connection.arrow_number_to_decimal = True + assert connection.arrow_number_to_decimal is True + + def test_arrow_number_to_decimal_setter_enables_backward_compatible(self, connection): + connection.arrow_number_to_decimal_setter = True + assert connection.arrow_number_to_decimal is True + + def test_arrow_number_to_decimal_setter_disables(self, connection): + connection.arrow_number_to_decimal = True + connection.arrow_number_to_decimal = False + assert connection.arrow_number_to_decimal is False + + def test_arrow_number_to_decimal_setter_coerces_to_bool(self, connection): + connection.arrow_number_to_decimal = 1 + assert connection.arrow_number_to_decimal is True + + connection.arrow_number_to_decimal = 0 + assert connection.arrow_number_to_decimal is False diff --git a/python/tests/unit/test_cursor.py b/python/tests/unit/test_cursor.py index 2a1cd9e42..83379dcf3 100644 --- a/python/tests/unit/test_cursor.py +++ b/python/tests/unit/test_cursor.py @@ -8,12 +8,17 @@ import pytest from snowflake.connector._internal.errorcode import ER_NO_PYARROW -from snowflake.connector._internal.extras import MissingOptionalDependency +from snowflake.connector._internal.extras import ( + MissingOptionalDependency, +) +from snowflake.connector._internal.extras import ( + check_dependency as _real_check_dependency, +) from snowflake.connector._internal.protobuf_gen.database_driver_v1_pb2 import ( ConnectionHandle, StatementHandle, ) -from snowflake.connector.cursor import SnowflakeCursor, SnowflakeCursorBase +from snowflake.connector.cursor import FetchMode, SnowflakeCursor, SnowflakeCursorBase from snowflake.connector.errors import ProgrammingError @@ -794,3 +799,313 @@ def test_error_message_contains_install_link(self, cursor): with patch("snowflake.connector.cursor.pandas", MissingOptionalDependency(dep="pandas")): with pytest.raises(ProgrammingError, match="python-connector-pandas"): cursor.check_can_use_pandas() + + +class TestFetchArrowBatches: + """Unit tests for fetch_arrow_batches.""" + + @pytest.fixture + def mock_connection(self): + return MagicMock() + + @pytest.fixture + def cursor(self, mock_connection): + return SnowflakeCursor(mock_connection) + + @pytest.fixture(autouse=True) + def _patch_pyarrow(self): + mock_pa = MagicMock() + with ( + patch("snowflake.connector._internal.extras.check_dependency"), + patch("snowflake.connector.cursor.pyarrow", new=mock_pa), + ): + self.pa = mock_pa + yield + + def test_yields_tables_from_batches(self, cursor): + batch1, batch2 = MagicMock(), MagicMock() + table1, table2 = MagicMock(), MagicMock() + self.pa.Table.from_batches.side_effect = [table1, table2] + + with patch.object(cursor, "_get_table_iterator", return_value=iter([batch1, batch2])): + tables = list(cursor.fetch_arrow_batches()) + + assert tables == [table1, table2] + self.pa.Table.from_batches.assert_any_call([batch1]) + self.pa.Table.from_batches.assert_any_call([batch2]) + + def test_yields_nothing_for_empty_stream(self, cursor): + with patch.object(cursor, "_get_table_iterator", return_value=iter([])): + tables = list(cursor.fetch_arrow_batches()) + + assert tables == [] + + def test_raises_when_pyarrow_not_installed(self, cursor): + missing = MissingOptionalDependency(dep="pyarrow") + with patch( + "snowflake.connector._internal.extras.check_dependency", + side_effect=lambda _: _real_check_dependency(missing), + ): + with pytest.raises(ProgrammingError, match="pyarrow"): + list(cursor.fetch_arrow_batches()) + + def test_passes_force_microsecond_precision(self, cursor): + with patch.object(cursor, "_get_table_iterator", return_value=iter([])) as mock_get: + list(cursor.fetch_arrow_batches(force_microsecond_precision=True)) + + mock_get.assert_called_once_with(force_microsecond_precision=True) + + +class TestFetchArrowAll: + """Unit tests for fetch_arrow_all.""" + + @pytest.fixture + def mock_connection(self): + return MagicMock() + + @pytest.fixture + def cursor(self, mock_connection): + return SnowflakeCursor(mock_connection) + + @pytest.fixture(autouse=True) + def _patch_pyarrow(self): + mock_pa = MagicMock() + with ( + patch("snowflake.connector._internal.extras.check_dependency"), + patch("snowflake.connector.cursor.pyarrow", new=mock_pa), + ): + self.pa = mock_pa + yield + + def test_returns_concatenated_table(self, cursor): + batch1, batch2 = MagicMock(), MagicMock() + mock_table = MagicMock() + self.pa.Table.from_batches.return_value = mock_table + + with patch.object(cursor, "_get_table_iterator", return_value=iter([batch1, batch2])): + result = cursor.fetch_arrow_all() + + assert result is mock_table + self.pa.Table.from_batches.assert_called_once_with([batch1, batch2]) + + def test_returns_none_for_empty_stream(self, cursor): + mock_iterator = MagicMock() + mock_iterator.__iter__ = MagicMock(return_value=iter([])) + + with patch.object(cursor, "_get_table_iterator", return_value=mock_iterator): + result = cursor.fetch_arrow_all() + + assert result is None + + def test_returns_empty_table_with_force_return_table(self, cursor): + mock_empty_table = MagicMock() + mock_schema = MagicMock() + mock_schema.empty_table.return_value = mock_empty_table + + mock_iterator = MagicMock() + mock_iterator.__iter__ = MagicMock(return_value=iter([])) + mock_iterator.get_converted_schema.return_value = mock_schema + + with patch.object(cursor, "_get_table_iterator", return_value=mock_iterator): + result = cursor.fetch_arrow_all(force_return_table=True) + + assert result is mock_empty_table + mock_iterator.get_converted_schema.assert_called_once() + mock_schema.empty_table.assert_called_once() + + def test_returns_none_without_force_return_table(self, cursor): + mock_iterator = MagicMock() + mock_iterator.__iter__ = MagicMock(return_value=iter([])) + + with patch.object(cursor, "_get_table_iterator", return_value=mock_iterator): + result = cursor.fetch_arrow_all(force_return_table=False) + + assert result is None + + def test_passes_force_microsecond_precision(self, cursor): + with patch.object(cursor, "_get_table_iterator", return_value=iter([])) as mock_get: + cursor.fetch_arrow_all(force_microsecond_precision=True) + + mock_get.assert_called_once_with(force_microsecond_precision=True) + + +class TestFetchPandasBatches: + """Unit tests for fetch_pandas_batches.""" + + @pytest.fixture + def mock_connection(self): + return MagicMock() + + @pytest.fixture + def cursor(self, mock_connection): + return SnowflakeCursor(mock_connection) + + @pytest.fixture(autouse=True) + def _patch_deps(self): + with patch("snowflake.connector._internal.extras.check_dependency"): + yield + + def test_yields_to_pandas_results(self, cursor): + table1, table2 = MagicMock(), MagicMock() + df1, df2 = MagicMock(), MagicMock() + table1.to_pandas.return_value = df1 + table2.to_pandas.return_value = df2 + + with patch.object(cursor, "fetch_arrow_batches", return_value=iter([table1, table2])): + dfs = list(cursor.fetch_pandas_batches()) + + assert dfs == [df1, df2] + table1.to_pandas.assert_called_once() + table2.to_pandas.assert_called_once() + + def test_raises_when_pandas_not_installed(self, cursor): + missing = MissingOptionalDependency(dep="pandas") + with patch( + "snowflake.connector._internal.extras.check_dependency", + side_effect=lambda _: _real_check_dependency(missing), + ): + with pytest.raises(ProgrammingError, match="pandas"): + list(cursor.fetch_pandas_batches()) + + +class TestFetchPandasAll: + """Unit tests for fetch_pandas_all.""" + + @pytest.fixture + def mock_connection(self): + return MagicMock() + + @pytest.fixture + def cursor(self, mock_connection): + return SnowflakeCursor(mock_connection) + + @pytest.fixture(autouse=True) + def _patch_deps(self): + with patch("snowflake.connector._internal.extras.check_dependency"): + yield + + def test_returns_to_pandas_result(self, cursor): + mock_table = MagicMock() + mock_df = MagicMock() + mock_table.to_pandas.return_value = mock_df + + with patch.object(cursor, "fetch_arrow_all", return_value=mock_table): + result = cursor.fetch_pandas_all() + + assert result is mock_df + mock_table.to_pandas.assert_called_once() + + def test_returns_empty_dataframe_for_empty_stream(self, cursor): + mock_empty_table = MagicMock() + mock_empty_df = MagicMock() + mock_empty_table.to_pandas.return_value = mock_empty_df + + with patch.object(cursor, "fetch_arrow_all", return_value=mock_empty_table) as mock_fetch: + result = cursor.fetch_pandas_all() + + assert result is mock_empty_df + mock_fetch.assert_called_once_with(force_return_table=True) + mock_empty_table.to_pandas.assert_called_once() + + def test_raises_when_pandas_not_installed(self, cursor): + missing = MissingOptionalDependency(dep="pandas") + with patch( + "snowflake.connector._internal.extras.check_dependency", + side_effect=lambda _: _real_check_dependency(missing), + ): + with pytest.raises(ProgrammingError, match="pandas"): + cursor.fetch_pandas_all() + + def test_forwards_kwargs_to_fetch_arrow_all(self, cursor): + mock_table = MagicMock() + with patch.object(cursor, "fetch_arrow_all", return_value=mock_table) as mock_fetch: + cursor.fetch_pandas_all(force_microsecond_precision=True) + + mock_fetch.assert_called_once_with(force_return_table=True, force_microsecond_precision=True) + + +class TestFetchModeValidation: + """Unit tests for fetch mode validation (preventing mixed row/arrow fetching).""" + + @pytest.fixture + def mock_connection(self): + return MagicMock() + + @pytest.fixture + def cursor(self, mock_connection): + return SnowflakeCursor(mock_connection) + + @pytest.fixture(autouse=True) + def _patch_deps(self): + with ( + patch("snowflake.connector._internal.extras.check_dependency"), + patch("snowflake.connector.cursor.pyarrow", new=MagicMock()), + patch("snowflake.connector.cursor.pandas", new=MagicMock()), + ): + yield + + def test_row_then_arrow_raises(self, cursor): + cursor._iterator = iter([(1,)]) + + with patch.object(cursor, "_get_iterator"): + cursor.fetchone() + + with pytest.raises(ProgrammingError, match="Cannot use arrow/pandas fetch methods"): + list(cursor.fetch_arrow_batches()) + + def test_arrow_then_row_raises(self, cursor): + with patch.object(cursor, "_get_table_iterator", return_value=iter([])): + cursor.fetch_arrow_all() + + with pytest.raises(ProgrammingError, match="Cannot use row-by-row fetch methods"): + cursor.fetchone() + + def test_row_then_pandas_raises(self, cursor): + cursor._iterator = iter([(1,)]) + + with patch.object(cursor, "_get_iterator"): + cursor.fetchone() + + with pytest.raises(ProgrammingError, match="Cannot use arrow/pandas fetch methods"): + list(cursor.fetch_pandas_batches()) + + def test_pandas_then_row_raises(self, cursor): + cursor._fetch_mode = FetchMode.ARROW + + with pytest.raises(ProgrammingError, match="Cannot use row-by-row fetch methods"): + cursor.fetchall() + + def test_fetchall_then_arrow_raises(self, cursor): + cursor._iterator = iter([(1,)]) + + with patch.object(cursor, "_get_iterator"): + cursor.fetchall() + + with pytest.raises(ProgrammingError, match="Cannot use arrow/pandas fetch methods"): + cursor.fetch_arrow_all() + + def test_same_mode_is_fine(self, cursor): + cursor._iterator = iter([(1,), (2,)]) + + with patch.object(cursor, "_get_iterator"): + cursor.fetchone() + cursor.fetchone() + + def test_execute_resets_fetch_mode(self, cursor, mock_connection): + mock_connection.is_closed.return_value = False + result = MagicMock() + result.columns = [] + result.HasField.return_value = False + result.sql_state = "" + mock_connection.db_api.statement_execute_query.return_value.result = result + + cursor._fetch_mode = FetchMode.ARROW + with ( + patch("snowflake.connector.cursor.StatementNewRequest"), + patch("snowflake.connector.cursor.StatementSetSqlQueryRequest"), + patch("snowflake.connector.cursor.StatementExecuteQueryRequest"), + patch("snowflake.connector.cursor.StatementReleaseRequest"), + ): + cursor.execute("SELECT 1") + + assert cursor._fetch_mode is None diff --git a/tests/definitions/python/pandas/fetch_pandas.feature b/tests/definitions/python/pandas/fetch_pandas.feature index e8fbb0cc4..9e9ae4ba5 100644 --- a/tests/definitions/python/pandas/fetch_pandas.feature +++ b/tests/definitions/python/pandas/fetch_pandas.feature @@ -33,11 +33,29 @@ Feature: Pandas fetch methods (Python-specific) | object | OBJECT_CONSTRUCT('key','value')::OBJECT | NULL::OBJECT | @python_e2e - Scenario: should return empty pandas DataFrame for empty result set + Scenario Outline: should return empty column with correct pandas dtype Given Snowflake client is logged in - When Query "SELECT 1 AS id WHERE 1=0" is executed + When Query "SELECT AS col WHERE 1=0" is executed And fetch_pandas_all is called Then The result should be a pandas.DataFrame with 0 rows + And Column COL should have pandas dtype + + Examples: + | type_name | value_expr | pandas_dtype | + | number | 1::NUMBER | integer | + | scaled_number | 3.14::NUMBER(10,2) | float | + | varchar | 'hello'::VARCHAR | object | + | float | 1.5::FLOAT | float | + | boolean | TRUE::BOOLEAN | bool | + | date | '2026-03-23'::DATE | object | + | time | '12:30:00'::TIME | object | + | timestamp_ntz | '2026-03-23 10:30:00'::TIMESTAMP_NTZ | datetime64 | + | timestamp_ltz | '2026-03-23 10:30:00'::TIMESTAMP_LTZ | datetime64 | + | timestamp_tz | '2026-03-23 10:30:00 +0530'::TIMESTAMP_TZ | datetime64 | + | binary | TO_BINARY('ABCD','HEX')::BINARY | object | + | variant | TO_VARIANT(42) | object | + | array | ARRAY_CONSTRUCT(1,2,3)::ARRAY | object | + | object | OBJECT_CONSTRUCT('key','value')::OBJECT | object | @python_e2e Scenario: should convert scaled fixed number to decimal via fetch_pandas_all