Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ val slf4jVersion = "1.6.6"
val thriftVersion = "0.9.3"
val junitVersion = "4.10"
val jlineVersion = "2.14.3"
val hiveVersion = "2.2.0"

val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencies")

Expand Down Expand Up @@ -423,8 +424,9 @@ lazy val scaldingParquet = module("parquet").settings(
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"com.twitter" %% "bijection-macros" % bijectionVersion,
"com.twitter" %% "chill-bijection" % chillVersion,
"com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test"
),
"com.twitter.elephantbird" % "elephant-bird-core" % elephantbirdVersion % "test",
"org.apache.hive" % "hive-exec" % hiveVersion % "test" intransitive()
),
addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full))
.dependsOn(scaldingCore, scaldingHadoopTest % "test", scaldingParquetFixtures % "test->test")

Expand All @@ -436,7 +438,8 @@ lazy val scaldingParquetScroogeFixtures = module("parquet-scrooge-fixtures")
scroogeLanguages in Test := Seq("java", "scala"),
libraryDependencies ++= Seq(
"com.twitter" %% "scrooge-serializer" % scroogeVersion % "provided"
exclude("com.google.guava", "guava"),
exclude("com.google.guava", "guava")
exclude("org.apache.hive", "hive-exec" ),
"commons-lang" % "commons-lang" % apacheCommonsVersion, // needed for HashCodeBuilder used in thriftjava
"org.apache.thrift" % "libthrift" % thriftVersion
)
Expand All @@ -450,14 +453,14 @@ lazy val scaldingParquetScrooge = module("parquet-scrooge")
"org.apache.parquet" % "parquet-thrift" % parquetVersion % "test" classifier "tests"
exclude("org.apache.parquet", "parquet-pig")
exclude("com.twitter.elephantbird", "elephant-bird-pig")
exclude("com.twitter.elephantbird", "elephant-bird-core"),
exclude("com.twitter.elephantbird", "elephant-bird-core")
exclude("org.apache.hive", "hive-exec" ),
"com.twitter" %% "scrooge-serializer" % scroogeVersion
exclude("com.google.guava", "guava"),
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
"com.novocode" % "junit-interface" % "0.11" % "test",
"junit" % "junit" % junitVersion % "test"

)
)
).dependsOn(scaldingCore, scaldingParquet % "compile->compile;test->test", scaldingParquetScroogeFixtures % "test->test")

lazy val scaldingHRaven = module("hraven").settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.scalding.parquet.convert;

import static java.lang.Math.pow;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import org.apache.parquet.io.api.Binary;

/**
* @see org.apache.parquet.pig.convert.DecimalUtils
*/
public class DecimalUtils {

private DecimalUtils() {
}

public static BigDecimal binaryToDecimal(Binary value, int precision, int scale) {
/*
* Precision <= 18 checks for the max number of digits for an unscaled long,
* else treat with big integer conversion
*/
if (precision <= 18) {
ByteBuffer buffer = value.toByteBuffer();
byte[] bytes = buffer.array();
int start = buffer.arrayOffset() + buffer.position();
int end = buffer.arrayOffset() + buffer.limit();
long unscaled = 0L;
int i = start;
while (i < end) {
unscaled = (unscaled << 8 | bytes[i] & 0xff);
i++;
}
int bits = 8 * (end - start);
long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
if (unscaledNew <= -pow(10, 18) || unscaledNew >= pow(10, 18)) {
return new BigDecimal(unscaledNew);
} else {
return BigDecimal.valueOf(unscaledNew / pow(10, scale));
}
} else {
return new BigDecimal(new BigInteger(value.getBytes()), scale);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.scalding.parquet.convert;

import java.sql.Timestamp;
import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.io.api.Binary;

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

/**
* Utility class for decoding INT96 encoded parquet timestamp to timestamp millis in GMT. This class
* is equivalent of @see org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime, which produces
* less intermediate objects during decoding. Derived from:
*
* @link https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTimestampUtils.java
* @link https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
*/
public final class ParquetTimestampUtils {

private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
private static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);

private ParquetTimestampUtils() {
}

/**
* Returns GMT timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of
* day nanos).
*
* @param timestampBinary INT96 parquet timestamp
* @return timestamp in millis, GMT timezone
*/
public static long getTimestampMillis(Binary timestampBinary) {
if (timestampBinary.length() != 12) {
throw new IllegalArgumentException(
"Parquet timestamp must be 12 bytes, actual " + timestampBinary.length());
}
byte[] bytes = timestampBinary.getBytes();

// little endian encoding - need to invert byte order
long timeOfDayNanos = ByteBuffer.wrap(new byte[]{bytes[7], bytes[6], bytes[5], bytes[4],
bytes[3], bytes[2], bytes[1], bytes[0]}).getLong();
int julianDay = ByteBuffer.wrap(new byte[]{bytes[11], bytes[10], bytes[9], bytes[8]}).getInt();

return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
}

private static long julianDayToMillis(int julianDay) {
return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
}

public static Timestamp asTimestamp(Binary timestampBinary) {
NanoTime nanoTime = NanoTime.fromBinary(timestampBinary);
Timestamp ts = new Timestamp(julianDayToMillis(nanoTime.getJulianDay())
+ (nanoTime.getTimeOfDayNanos() / NANOS_PER_MILLISECOND));
ts.setNanos((int) (nanoTime.getTimeOfDayNanos() % NANOS_PER_SECOND));
return ts;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@

import cascading.tuple.Tuple;

import com.twitter.scalding.parquet.convert.DecimalUtils;
import com.twitter.scalding.parquet.convert.ParquetTimestampUtils;
import java.math.BigDecimal;
import java.sql.Timestamp;

import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;

public class ParquetTupleConverter extends GroupConverter {
Expand All @@ -25,10 +32,18 @@ public ParquetTupleConverter(GroupType parquetSchema) {
}

private Converter newConverter(Type type, int i) {
if(!type.isPrimitive()) {
if (!type.isPrimitive()) {
throw new IllegalArgumentException("cascading can only build tuples from primitive types");
} else {
return new TuplePrimitiveConverter(this, i);
Converter converter = null;
if (TupleDecimalConverter.accepts(type)) {
converter = new TupleDecimalConverter(type, this, i);
} else if (TupleInt96DateConverter.accepts(type)) {
converter = new TupleInt96DateConverter(this, i);
} else {
converter = new TuplePrimitiveConverter(this, i);
}
return converter;
}
}

Expand All @@ -51,6 +66,7 @@ final public Tuple getCurrentTuple() {
}

static final class TuplePrimitiveConverter extends PrimitiveConverter {

private final ParquetTupleConverter parent;
private final int index;

Expand Down Expand Up @@ -89,4 +105,98 @@ public void addLong(long value) {
parent.getCurrentTuple().setLong(index, value);
}
}

static abstract class BaseTupleConverter extends PrimitiveConverter {

protected final ParquetTupleConverter parent;
protected final int index;

public BaseTupleConverter(ParquetTupleConverter parent, int index) {
this.parent = parent;
this.index = index;
}

abstract public void addBinary(Binary value);

@Override
public void addBoolean(boolean value) {
parent.getCurrentTuple().setBoolean(index, value);
}

@Override
public void addDouble(double value) {
parent.getCurrentTuple().setDouble(index, value);
}

@Override
public void addFloat(float value) {
parent.getCurrentTuple().setFloat(index, value);
}

@Override
public void addInt(int value) {
parent.getCurrentTuple().setInteger(index, value);
}

@Override
public void addLong(long value) {
parent.getCurrentTuple().setLong(index, value);
}
}

static final class TupleDecimalConverter extends BaseTupleConverter {

private static final int MAX_BIG_DECIMAL_PRECISION = 18;
private static final int MAX_INT_PRECISION = 10;

private final Type parquetType;

static boolean accepts(Type type) {
PrimitiveTypeName ptn = type.asPrimitiveType().getPrimitiveTypeName();
return (type.isPrimitive() && ptn == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
}

public TupleDecimalConverter(Type parquetType, ParquetTupleConverter parent, int index) {
super(parent, index);
this.parquetType = parquetType;
}

@Override
public void addBinary(Binary value) {
PrimitiveTypeName ptn = parquetType.asPrimitiveType().getPrimitiveTypeName();
if (ptn == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
DecimalMetadata dm = parquetType.asPrimitiveType().getDecimalMetadata();
int precision = dm.getPrecision();
int scale = dm.getScale();
BigDecimal bigDecimal = DecimalUtils.binaryToDecimal(value, precision, scale);
if (scale != 0 || precision > MAX_BIG_DECIMAL_PRECISION) {
parent.getCurrentTuple().setString(index, bigDecimal.toPlainString());
} else if (precision <= MAX_INT_PRECISION) {
parent.getCurrentTuple().setInteger(index, bigDecimal.intValue());
} else {
parent.getCurrentTuple().setLong(index, bigDecimal.longValue());
}
} else {
parent.getCurrentTuple().setString(index, value.toStringUsingUTF8());
}
}
}

static final class TupleInt96DateConverter extends BaseTupleConverter {

static boolean accepts(Type type) {
PrimitiveTypeName ptn = type.asPrimitiveType().getPrimitiveTypeName();
return (type.isPrimitive() && ptn == PrimitiveTypeName.INT96);
}

public TupleInt96DateConverter(ParquetTupleConverter parent, int index) {
super(parent, index);
}

@Override
public void addBinary(Binary value) {
Timestamp ts = ParquetTimestampUtils.asTimestamp(value);
parent.getCurrentTuple().setString(index, ts.toString());
}
}
}
Loading