diff --git a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala index 7b60eedc77..dcb0e2e49e 100644 --- a/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala +++ b/scalding-commons/src/main/scala/com/twitter/scalding/commons/source/PailSource.scala @@ -19,7 +19,7 @@ package com.twitter.scalding.commons.source import scala.reflect.ClassTag import com.backtype.cascading.tap.PailTap -import com.backtype.hadoop.pail.PailStructure +import com.backtype.hadoop.pail.{ PailStructure, PailSpec } import cascading.tap.Tap import com.twitter.bijection.Injection import com.twitter.scalding._ @@ -79,9 +79,16 @@ object PailSource { /** * Generic version of Pail sink accepts a PailStructure. + * Prefer the override taking a PailSpec as it gives you finer control over pail configuration (compression for ex) */ def sink[T](rootPath: String, structure: PailStructure[T]): PailSource[T] = - new PailSource(rootPath, structure) + sink(rootPath, PailTap.makeSpec(null, structure)) + + /** + * Generic version of Pail sink accepts a PailSpec. + */ + def sink[T](rootPath: String, spec: PailSpec): PailSource[T] = + new PailSource[T](rootPath, spec) /** * A Pail sink can also build its structure on the fly from a @@ -113,10 +120,19 @@ object PailSource { /** * Generic version of Pail source accepts a PailStructure. + * Prefer the override taking a PailSpec as it gives you finer control over pail configuration (compression for ex) */ def source[T](rootPath: String, structure: PailStructure[T], subPaths: Array[List[String]]): PailSource[T] = { assert(subPaths != null && subPaths.size > 0) - new PailSource(rootPath, structure, subPaths) + new PailSource[T](rootPath, PailTap.makeSpec(null, structure), subPaths) + } + + /** + * Generic version of Pail source accepts a PailSpec. + */ + def source[T](rootPath: String, spec: PailSpec, subPaths: Array[List[String]]): PailSource[T] = { + assert(subPaths != null && subPaths.size > 0) + new PailSource[T](rootPath, spec, subPaths) } /** @@ -145,16 +161,16 @@ object PailSource { } } -class PailSource[T] private (rootPath: String, structure: PailStructure[T], subPaths: Array[List[String]] = null)(implicit conv: TupleConverter[T]) - extends Source with Mappable[T] { +class PailSource[T] private (rootPath: String, spec: PailSpec, subPaths: Array[List[String]] = Array.empty)(implicit conv: TupleConverter[T], tset: TupleSetter[T]) + extends Source with Mappable[T] with TypedSink[T] { import Dsl._ override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](conv) + override def setter[U <: T]: TupleSetter[U] = TupleSetter.asSubSetter(tset) val fieldName = "pailItem" lazy val getTap = { - val spec = PailTap.makeSpec(null, structure) - val javaSubPath = if ((subPaths == null) || (subPaths.size == 0)) null else subPaths map { _.asJava } + val javaSubPath = subPaths map { _.asJava } val opts = new PailTap.PailTapOptions(spec, fieldName, javaSubPath, null) new PailTap(rootPath, opts) } @@ -172,7 +188,6 @@ class PailSource[T] private (rootPath: String, structure: PailStructure[T], subP TestTapFactory(this, tap.getScheme).createTap(readOrWrite)(mode) } } - } /**