diff --git a/build.sbt b/build.sbt index 72a19343b9..ce002e07b2 100644 --- a/build.sbt +++ b/build.sbt @@ -41,6 +41,7 @@ val slf4jVersion = "1.6.6" val thriftVersion = "0.9.3" val junitVersion = "4.10" val jlineVersion = "2.14.3" +val hiveVersion = "2.2.0" val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencies") @@ -423,8 +424,9 @@ lazy val scaldingParquet = module("parquet").settings( "org.scala-lang" % "scala-reflect" % scalaVersion.value, "com.twitter" %% "bijection-macros" % bijectionVersion, "com.twitter" %% "chill-bijection" % chillVersion, - "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test" - ), + "com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test", + "org.apache.hive" % "hive-exec" % hiveVersion % "test" intransitive() + ), addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full)) .dependsOn(scaldingCore, scaldingHadoopTest % "test", scaldingParquetFixtures % "test->test") @@ -436,7 +438,8 @@ lazy val scaldingParquetScroogeFixtures = module("parquet-scrooge-fixtures") scroogeLanguages in Test := Seq("java", "scala"), libraryDependencies ++= Seq( "com.twitter" %% "scrooge-serializer" % scroogeVersion % "provided" - exclude("com.google.guava", "guava"), + exclude("com.google.guava", "guava") + exclude("org.apache.hive", "hive-exec" ), "commons-lang" % "commons-lang" % apacheCommonsVersion, // needed for HashCodeBuilder used in thriftjava "org.apache.thrift" % "libthrift" % thriftVersion ) @@ -450,14 +453,14 @@ lazy val scaldingParquetScrooge = module("parquet-scrooge") "org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests" exclude("org.apache.parquet", "parquet-pig") exclude("com.twitter.elephantbird", "elephant-bird-pig") - exclude("com.twitter.elephantbird", "elephant-bird-core"), + exclude("com.twitter.elephantbird", "elephant-bird-core") + exclude("org.apache.hive", "hive-exec" ), "com.twitter" %% "scrooge-serializer" % scroogeVersion exclude("com.google.guava", "guava"), "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "com.novocode" % "junit-interface" % "0.11" % "test", "junit" % "junit" % junitVersion % "test" - - ) + ) ).dependsOn(scaldingCore, scaldingParquet % "compile->compile;test->test", scaldingParquetScroogeFixtures % "test->test") lazy val scaldingHRaven = module("hraven").settings( diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/convert/DecimalUtils.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/convert/DecimalUtils.java new file mode 100644 index 0000000000..ec869ecb9c --- /dev/null +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/convert/DecimalUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.scalding.parquet.convert; + +import static java.lang.Math.pow; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import org.apache.parquet.io.api.Binary; + +/** + * @see org.apache.parquet.pig.convert.DecimalUtils + */ +public class DecimalUtils { + + private DecimalUtils() { + } + + public static BigDecimal binaryToDecimal(Binary value, int precision, int scale) { + /* + * Precision <= 18 checks for the max number of digits for an unscaled long, + * else treat with big integer conversion + */ + if (precision <= 18) { + ByteBuffer buffer = value.toByteBuffer(); + byte[] bytes = buffer.array(); + int start = buffer.arrayOffset() + buffer.position(); + int end = buffer.arrayOffset() + buffer.limit(); + long unscaled = 0L; + int i = start; + while (i < end) { + unscaled = (unscaled << 8 | bytes[i] & 0xff); + i++; + } + int bits = 8 * (end - start); + long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits); + if (unscaledNew <= -pow(10, 18) || unscaledNew >= pow(10, 18)) { + return new BigDecimal(unscaledNew); + } else { + return BigDecimal.valueOf(unscaledNew / pow(10, scale)); + } + } else { + return new BigDecimal(new BigInteger(value.getBytes()), scale); + } + } +} \ No newline at end of file diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/convert/ParquetTimestampUtils.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/convert/ParquetTimestampUtils.java new file mode 100644 index 0000000000..9781d32d51 --- /dev/null +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/convert/ParquetTimestampUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.scalding.parquet.convert; + +import java.sql.Timestamp; +import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.io.api.Binary; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for decoding INT96 encoded parquet timestamp to timestamp millis in GMT. This class + * is equivalent of @see org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime, which produces + * less intermediate objects during decoding. Derived from: + * + * @link https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTimestampUtils.java + * @link https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java + */ +public final class ParquetTimestampUtils { + + private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588; + private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); + private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); + private static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + + private ParquetTimestampUtils() { + } + + /** + * Returns GMT timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of + * day nanos). + * + * @param timestampBinary INT96 parquet timestamp + * @return timestamp in millis, GMT timezone + */ + public static long getTimestampMillis(Binary timestampBinary) { + if (timestampBinary.length() != 12) { + throw new IllegalArgumentException( + "Parquet timestamp must be 12 bytes, actual " + timestampBinary.length()); + } + byte[] bytes = timestampBinary.getBytes(); + + // little endian encoding - need to invert byte order + long timeOfDayNanos = ByteBuffer.wrap(new byte[]{bytes[7], bytes[6], bytes[5], bytes[4], + bytes[3], bytes[2], bytes[1], bytes[0]}).getLong(); + int julianDay = ByteBuffer.wrap(new byte[]{bytes[11], bytes[10], bytes[9], bytes[8]}).getInt(); + + return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND); + } + + private static long julianDayToMillis(int julianDay) { + return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; + } + + public static Timestamp asTimestamp(Binary timestampBinary) { + NanoTime nanoTime = NanoTime.fromBinary(timestampBinary); + Timestamp ts = new Timestamp(julianDayToMillis(nanoTime.getJulianDay()) + + (nanoTime.getTimeOfDayNanos() / NANOS_PER_MILLISECOND)); + ts.setNanos((int) (nanoTime.getTimeOfDayNanos() % NANOS_PER_SECOND)); + return ts; + } + +} \ No newline at end of file diff --git a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java index 4f313d7392..e36c71da14 100644 --- a/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java +++ b/scalding-parquet/src/main/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverter.java @@ -2,11 +2,18 @@ import cascading.tuple.Tuple; +import com.twitter.scalding.parquet.convert.DecimalUtils; +import com.twitter.scalding.parquet.convert.ParquetTimestampUtils; +import java.math.BigDecimal; +import java.sql.Timestamp; + import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; public class ParquetTupleConverter extends GroupConverter { @@ -25,10 +32,18 @@ public ParquetTupleConverter(GroupType parquetSchema) { } private Converter newConverter(Type type, int i) { - if(!type.isPrimitive()) { + if (!type.isPrimitive()) { throw new IllegalArgumentException("cascading can only build tuples from primitive types"); } else { - return new TuplePrimitiveConverter(this, i); + Converter converter = null; + if (TupleDecimalConverter.accepts(type)) { + converter = new TupleDecimalConverter(type, this, i); + } else if (TupleInt96DateConverter.accepts(type)) { + converter = new TupleInt96DateConverter(this, i); + } else { + converter = new TuplePrimitiveConverter(this, i); + } + return converter; } } @@ -51,6 +66,7 @@ final public Tuple getCurrentTuple() { } static final class TuplePrimitiveConverter extends PrimitiveConverter { + private final ParquetTupleConverter parent; private final int index; @@ -89,4 +105,98 @@ public void addLong(long value) { parent.getCurrentTuple().setLong(index, value); } } + + static abstract class BaseTupleConverter extends PrimitiveConverter { + + protected final ParquetTupleConverter parent; + protected final int index; + + public BaseTupleConverter(ParquetTupleConverter parent, int index) { + this.parent = parent; + this.index = index; + } + + abstract public void addBinary(Binary value); + + @Override + public void addBoolean(boolean value) { + parent.getCurrentTuple().setBoolean(index, value); + } + + @Override + public void addDouble(double value) { + parent.getCurrentTuple().setDouble(index, value); + } + + @Override + public void addFloat(float value) { + parent.getCurrentTuple().setFloat(index, value); + } + + @Override + public void addInt(int value) { + parent.getCurrentTuple().setInteger(index, value); + } + + @Override + public void addLong(long value) { + parent.getCurrentTuple().setLong(index, value); + } + } + + static final class TupleDecimalConverter extends BaseTupleConverter { + + private static final int MAX_BIG_DECIMAL_PRECISION = 18; + private static final int MAX_INT_PRECISION = 10; + + private final Type parquetType; + + static boolean accepts(Type type) { + PrimitiveTypeName ptn = type.asPrimitiveType().getPrimitiveTypeName(); + return (type.isPrimitive() && ptn == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + } + + public TupleDecimalConverter(Type parquetType, ParquetTupleConverter parent, int index) { + super(parent, index); + this.parquetType = parquetType; + } + + @Override + public void addBinary(Binary value) { + PrimitiveTypeName ptn = parquetType.asPrimitiveType().getPrimitiveTypeName(); + if (ptn == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + DecimalMetadata dm = parquetType.asPrimitiveType().getDecimalMetadata(); + int precision = dm.getPrecision(); + int scale = dm.getScale(); + BigDecimal bigDecimal = DecimalUtils.binaryToDecimal(value, precision, scale); + if (scale != 0 || precision > MAX_BIG_DECIMAL_PRECISION) { + parent.getCurrentTuple().setString(index, bigDecimal.toPlainString()); + } else if (precision <= MAX_INT_PRECISION) { + parent.getCurrentTuple().setInteger(index, bigDecimal.intValue()); + } else { + parent.getCurrentTuple().setLong(index, bigDecimal.longValue()); + } + } else { + parent.getCurrentTuple().setString(index, value.toStringUsingUTF8()); + } + } + } + + static final class TupleInt96DateConverter extends BaseTupleConverter { + + static boolean accepts(Type type) { + PrimitiveTypeName ptn = type.asPrimitiveType().getPrimitiveTypeName(); + return (type.isPrimitive() && ptn == PrimitiveTypeName.INT96); + } + + public TupleInt96DateConverter(ParquetTupleConverter parent, int index) { + super(parent, index); + } + + @Override + public void addBinary(Binary value) { + Timestamp ts = ParquetTimestampUtils.asTimestamp(value); + parent.getCurrentTuple().setString(index, ts.toString()); + } + } } diff --git a/scalding-parquet/src/test/java/com/twitter/scalding/parquet/convert/ParquetTimestampUtilsTest.java b/scalding-parquet/src/test/java/com/twitter/scalding/parquet/convert/ParquetTimestampUtilsTest.java new file mode 100644 index 0000000000..f93f8caca1 --- /dev/null +++ b/scalding-parquet/src/test/java/com/twitter/scalding/parquet/convert/ParquetTimestampUtilsTest.java @@ -0,0 +1,61 @@ +package com.twitter.scalding.parquet.convert; + +import static com.twitter.scalding.parquet.convert.ParquetTimestampUtils.asTimestamp; +import static com.twitter.scalding.parquet.convert.ParquetTimestampUtils.getTimestampMillis; +import static org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils.getNanoTime; +import static org.apache.parquet.io.api.Binary.fromConstantByteBuffer; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; +import org.apache.parquet.io.api.Binary; +import org.junit.Test; + +public class ParquetTimestampUtilsTest { + + @Test + public void testGetTimestampMillis() { + assertTimestampCorrect("2011-01-01 00:00:00.000000000"); + assertTimestampCorrect("2001-01-01 01:01:01.000000001"); + assertTimestampCorrect("2015-12-31 23:59:59.999999999"); + } + + @Test + public void testInvalidBinaryLength() { + try { + byte[] invalidLengthBinaryTimestamp = new byte[8]; + getTimestampMillis(Binary.fromByteArray(invalidLengthBinaryTimestamp)); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), "Parquet timestamp must be 12 bytes, actual 8"); + } + } + + @Test + public void testGetTimestamp() { + assertTimestampNanosCorrect("2011-01-01 00:00:00.0"); + assertTimestampNanosCorrect("2001-01-01 01:01:01.000000001"); + assertTimestampNanosCorrect("2015-12-31 23:59:59.999999999"); + assertTimestampNanosCorrect("2015-12-17 16:08:01.987654321"); + } + + private static void assertTimestampCorrect(String timestampString) { + Timestamp timestamp = Timestamp.valueOf(timestampString); + NanoTime nanoTime = getNanoTime(timestamp, false); + ByteBuffer buffer = ByteBuffer.wrap(nanoTime.toBinary().getBytes()); + long decodedTimestampMillis = ParquetTimestampUtils + .getTimestampMillis(fromConstantByteBuffer(buffer)); + assertEquals(decodedTimestampMillis, timestamp.getTime()); + } + + private static void assertTimestampNanosCorrect(String timestampString) { + Timestamp timestamp = Timestamp.valueOf(timestampString); + NanoTime nanoTime = getNanoTime(timestamp, false); + ByteBuffer buffer = ByteBuffer.wrap(nanoTime.toBinary().getBytes()); + + Timestamp result = asTimestamp(fromConstantByteBuffer(buffer)); + + assertEquals(timestampString, result.toString()); + } + +} diff --git a/scalding-parquet/src/test/java/com/twitter/scalding/parquet/convert/TestDecimalUtils.java b/scalding-parquet/src/test/java/com/twitter/scalding/parquet/convert/TestDecimalUtils.java new file mode 100644 index 0000000000..88a45db018 --- /dev/null +++ b/scalding-parquet/src/test/java/com/twitter/scalding/parquet/convert/TestDecimalUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.twitter.scalding.parquet.convert; + +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; +import org.apache.parquet.io.api.Binary; +import org.junit.Test; + +public class TestDecimalUtils { + + private void testDecimalConversion(double value, int precision, int scale, String stringValue) { + String originalString = Double.toString(value); + BigDecimal originalValue = new BigDecimal(originalString); + BigDecimal convertedValue = DecimalUtils + .binaryToDecimal(Binary.fromByteArray(originalValue.unscaledValue().toByteArray()), + precision,scale); + assertEquals(stringValue, convertedValue.toString()); + } + + private void testDecimalConversion(int value, int precision, int scale, String stringValue) { + String originalString = Integer.toString(value); + BigDecimal originalValue = new BigDecimal(originalString); + BigDecimal convertedValue = DecimalUtils + .binaryToDecimal(Binary.fromByteArray(originalValue.unscaledValue().toByteArray()), + precision,scale); + assertEquals(stringValue, convertedValue.toString()); + } + + private void testDecimalConversion(long value, int precision, int scale, String stringValue) { + String originalString = Long.toString(value); + BigDecimal originalValue = new BigDecimal(originalString); + BigDecimal convertedValue = DecimalUtils + .binaryToDecimal(Binary.fromByteArray(originalValue.unscaledValue().toByteArray()), + precision, scale); + assertEquals(stringValue, convertedValue.toString()); + } + + @Test + public void testBinaryToDecimal() throws Exception { + // Known issue: testing Nx10^M doubles from BigDecimal.unscaledValue() always converts to Nx10 regardless of M + // Known issue: any double with precision > 17 breaks in test but not in functional testing + + // Test LONG + testDecimalConversion(Long.MAX_VALUE,19,0,"9223372036854775807"); + testDecimalConversion(Long.MIN_VALUE,19,0,"-9223372036854775808"); + testDecimalConversion(0L,0,0,"0.0"); + + // Test INTEGER + testDecimalConversion(Integer.MAX_VALUE,10,0,"2147483647"); + testDecimalConversion(Integer.MIN_VALUE,10,0,"-2147483648"); + testDecimalConversion(0,0,0,"0.0"); + + // Test DOUBLE + testDecimalConversion(12345678912345678d,17,0,"12345678912345678"); + testDecimalConversion(123456789123456.78,17,2,"123456789123456.78"); + testDecimalConversion(0.12345678912345678,17,17,"0.12345678912345678"); + testDecimalConversion(-0.000102,6,6,"-0.000102"); + } +} diff --git a/scalding-parquet/src/test/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverterTest.java b/scalding-parquet/src/test/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverterTest.java new file mode 100644 index 0000000000..208e1535ef --- /dev/null +++ b/scalding-parquet/src/test/java/com/twitter/scalding/parquet/tuple/ParquetTupleConverterTest.java @@ -0,0 +1,279 @@ +package com.twitter.scalding.parquet.tuple; + +import static org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils.getNanoTime; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assert.assertThat; + +import cascading.tuple.Tuple; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ParquetTupleConverterTest { + + private static GroupType parquetSchema; + + private static ParquetTupleConverter converter; + private static PrimitiveConverter tupleConverter; + + @BeforeClass + public static void setUpClass() { + String parquetSchemaString = "message spark_schema {\n" + + " optional binary basicString (UTF8);\n" + + " optional int32 int64field;\n" + + " optional int64 int64field;\n" + + " optional fixed_len_byte_array(8) decimal180 (DECIMAL(18,0));\n" + + " optional fixed_len_byte_array(2) decimal40 (DECIMAL(4,0));\n" + + " optional fixed_len_byte_array(8) decimal100 (DECIMAL(10,0));\n" + + " optional fixed_len_byte_array(8) decimal120 (DECIMAL(12,0));\n" + + " optional fixed_len_byte_array(8) decimal82 (DECIMAL(8,2));\n" + + " optional fixed_len_byte_array(8) decimal122 (DECIMAL(12,2));\n" + + " optional fixed_len_byte_array(10) decimal190 (DECIMAL(19,0));\n" + + " optional fixed_len_byte_array(10) decimal191 (DECIMAL(19,1));\n" + + " optional int32 int32date (DATE);\n" + + " optional int96 int96date;\n" + + "}"; + + parquetSchema = MessageTypeParser.parseMessageType(parquetSchemaString); + } + + @Before + public void setUp() { + converter = new ParquetTupleConverter(parquetSchema); + } + + @Test + public void testConverterCreation() { + assertThat(converter.getConverter(0), + is(instanceOf(ParquetTupleConverter.TuplePrimitiveConverter.class))); + assertThat(converter.getConverter(1), + is(instanceOf(ParquetTupleConverter.TuplePrimitiveConverter.class))); + assertThat(converter.getConverter(2), + is(instanceOf(ParquetTupleConverter.TuplePrimitiveConverter.class))); + assertThat(converter.getConverter(3), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(4), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(5), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(6), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(7), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(8), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(9), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(10), + is(instanceOf(ParquetTupleConverter.TupleDecimalConverter.class))); + assertThat(converter.getConverter(11), + is(instanceOf(ParquetTupleConverter.TuplePrimitiveConverter.class))); + assertThat(converter.getConverter(12), + is(instanceOf(ParquetTupleConverter.TupleInt96DateConverter.class))); + } + + @Test + public void testConvertDecimal180() { + int index = 3; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.asLongBuffer().put(12345); + Binary value = Binary.fromConstantByteBuffer(buffer); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getLong(index), is(12345L)); + } + + @Test + public void testConvertDecimal40() { + int index = 4; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + + ByteBuffer buffer = ByteBuffer.wrap(intToTwoByteArray(123)); + Binary value = Binary.fromConstantByteBuffer(buffer); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getInteger(index), is(123)); + } + + @Test + public void testConvertDecimal100() { + int index = 5; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.asLongBuffer().put(1234567890); + Binary value = Binary.fromConstantByteBuffer(buffer); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getInteger(index), is(1234567890)); + } + + @Test + public void testConvertDecimal120() { + int index = 6; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.asLongBuffer().put(123456789012L); + Binary value = Binary.fromConstantByteBuffer(buffer); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getLong(index), is(123456789012L)); + } + + @Test + public void testConvertDecimal82() { + int index = 7; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.asLongBuffer().put(1234567); + Binary value = Binary.fromConstantByteBuffer(buffer); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getString(index), is("12345.67")); + } + + @Test + public void testConvertDecimal122() { + int index = 8; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.asLongBuffer().put(1234567891); + Binary value = Binary.fromConstantByteBuffer(buffer); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getString(index), is("12345678.91")); + } + + @Test + public void testConvertDecimal190() { + int index = 9; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + Binary value = Binary + .fromConstantByteArray(new BigInteger("1234567890123456789").toByteArray()); + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getString(index), is("1234567890123456789")); + } + + @Test + public void testConvertDecimal191() { + int index = 10; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + Binary value = Binary + .fromConstantByteArray(new BigInteger("1234567890123456789").toByteArray()); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getString(index), is("123456789012345678.9")); + } + + @Test + public void testConvertInt96Date() { + int index = 12; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + Binary value = timestampStringToBinary("2011-01-01 00:00:00.123100001"); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getString(index), is("2011-01-01 00:00:00.123100001")); + } + + @Test + public void testConvertInt96DateAnoherDate() { + int index = 12; + tupleConverter = (PrimitiveConverter) converter.getConverter(index); + + assertThat(tupleConverter, is(notNullValue())); + Binary value = timestampStringToBinary("2015-12-17 16:19:59.192837465"); + + converter.start(); + tupleConverter.addBinary(value); + converter.end(); + + Tuple tuple = converter.getCurrentTuple(); + + assertThat(tuple.getString(index), is("2015-12-17 16:19:59.192837465")); + } + + public static final byte[] intToTwoByteArray(int value) { + return new byte[]{ + (byte) (value >>> 8), + (byte) value}; + } + + public static Binary timestampStringToBinary(String timestampString) { + Timestamp timestamp = Timestamp.valueOf(timestampString); + NanoTime nanoTime = getNanoTime(timestamp, false); + return nanoTime.toBinary(); + } +}