Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,32 @@ import java.util.Comparator
import scala.annotation.tailrec
import scala.collection.JavaConverters._

/**
* These are particularly unsafe and we don't recommend using them
*/
trait DeprecatedFieldConversions extends LowPriorityFieldConversions {
implicit def travOnceToFields[T](t: TraversableOnce[Any]): Fields =
parseAnySeqToFields(t)
}

object DeprecatedFieldConversions extends DeprecatedFieldConversions

trait LowPriorityFieldConversions {
/**
* Useful to convert f : Any* to Fields. This handles mixed cases ("hey", 'you).
* Not sure we should be this flexible, but given that Cascading will throw an
* exception before scheduling the job, I guess this is okay.
*/
def parseAnySeqToFields[T <: TraversableOnce[Any]](anyf: T): Fields = {
val fields = new Fields(anyf.toSeq.map { anyToFieldArg }: _*)
anyf.foreach {
_ match {
case field: Field[_] => fields.setComparator(field.id, field.ord)
case _ =>
}
}
fields
}

protected def anyToFieldArg(f: Any): Comparable[_] = f match {
case x: Symbol => x.name
Expand All @@ -49,13 +74,18 @@ trait LowPriorityFieldConversions {
* Lists are handled by an implicit in FieldConversions, which have
* higher priority.
*/
implicit def productToFields(f: Product): Fields = {
val fields = new Fields(f.productIterator.map { anyToFieldArg }.toSeq: _*)
f.productIterator.foreach {
case field: Field[_] => fields.setComparator(field.id, field.ord)
case _ =>
}
fields
implicit def productToFields(f: Product): Fields = f match {
case l: List[_] =>
// List is a common product, unfortunately this is a very dangerous
// implicit since so many things extends product...
parseAnySeqToFields(l)
case _ =>
val fields = new Fields(f.productIterator.map(anyToFieldArg).toSeq: _*)
f.productIterator.foreach {
case field: Field[_] => fields.setComparator(field.id, field.ord)
case _ => ()
}
fields
}
}

Expand Down Expand Up @@ -169,25 +199,12 @@ trait FieldConversions extends LowPriorityFieldConversions {
implicit def fromEnum[T <: Enumeration](enumeration: T): Fields =
new Fields(enumeration.values.toList.map { _.toString }: _*)

implicit def fields[T <: TraversableOnce[Symbol]](f: T): Fields = new Fields(f.toSeq.map(_.name): _*)
implicit def strFields[T <: TraversableOnce[String]](f: T): Fields = new Fields(f.toSeq: _*)
implicit def intFields[T <: TraversableOnce[Int]](f: T): Fields = {
new Fields(f.toSeq.map { new java.lang.Integer(_) }: _*)
}
implicit def fieldFields[T <: TraversableOnce[Field[_]]](f: T): RichFields = RichFields(f.toSeq)
/**
* Useful to convert f : Any* to Fields. This handles mixed cases ("hey", 'you).
* Not sure we should be this flexible, but given that Cascading will throw an
* exception before scheduling the job, I guess this is okay.
*/
implicit def parseAnySeqToFields[T <: TraversableOnce[Any]](anyf: T): Fields = {
val fields = new Fields(anyf.toSeq.map { anyToFieldArg }: _*)
anyf.foreach {
case field: Field[_] => fields.setComparator(field.id, field.ord)
case _ =>
}
fields
}
def fields[T <: TraversableOnce[Symbol]](f: T): Fields = new Fields(f.toSeq.map(_.name): _*)
def strFields[T <: TraversableOnce[String]](f: T): Fields = new Fields(f.toSeq: _*)
def intFields[T <: TraversableOnce[Int]](f: T): Fields =
new Fields(f.toSeq.map(Integer.valueOf): _*)

implicit def fieldFields[T <: TraversableOnce[Field[_]]](f: T): Fields = RichFields(f.toSeq)

//Handle a pair generally:
implicit def tuple2ToFieldsPair[T, U](pair: (T, U))(implicit tf: T => Fields, uf: U => Fields): (Fields, Fields) = {
Expand All @@ -199,7 +216,7 @@ trait FieldConversions extends LowPriorityFieldConversions {
* We can't set the field Manifests because cascading doesn't (yet) expose field type information
* in the Fields API.
*/
implicit def fieldsToRichFields(fields: Fields): RichFields = {
implicit def fieldsToRichFields(fields: Fields): RichFields =
if (!fields.isDefined) {
// TODO We could provide a reasonable conversion here by designing a rich type hierarchy such as
// Fields
Expand All @@ -211,20 +228,21 @@ trait FieldConversions extends LowPriorityFieldConversions {
// of the appropriate type
sys.error("virtual Fields cannot be converted to RichFields")
}
else {

// This bit is kludgy because cascading provides different interfaces for extracting
// IDs and Comparators from a Fields instance. (The IDs are only available
// "one at a time" by querying for a specific index, while the Comparators are only
// available "all at once" by calling getComparators.)
// This bit is kludgy because cascading provides different interfaces for extracting
// IDs and Comparators from a Fields instance. (The IDs are only available
// "one at a time" by querying for a specific index, while the Comparators are only
// available "all at once" by calling getComparators.)

new RichFields(asList(fields).zip(fields.getComparators).map {
case (id: Comparable[_], comparator: Comparator[_]) => id match {
case x: java.lang.Integer => IntField(x)(Ordering.comparatorToOrdering(comparator), None)
case y: String => StringField(y)(Ordering.comparatorToOrdering(comparator), None)
case z => sys.error("not expecting object of type " + z.getClass + " as field name")
}
})
}
new RichFields(asList(fields).zip(fields.getComparators).map {
case (id: Comparable[_], comparator: Comparator[_]) => id match {
case x: java.lang.Integer => IntField(x)(Ordering.comparatorToOrdering(comparator), None)
case y: String => StringField(y)(Ordering.comparatorToOrdering(comparator), None)
case z => sys.error("not expecting object of type " + z.getClass + " as field name")
}
})
}

}

Expand All @@ -234,7 +252,7 @@ trait FieldConversions extends LowPriorityFieldConversions {
// val myFields: Fields = ...
// myFields.toFieldList

case class RichFields(val toFieldList: List[Field[_]]) extends Fields(toFieldList.map { _.id }: _*) {
case class RichFields(toFieldList: List[Field[_]]) extends Fields(toFieldList.map { _.id }: _*) {
toFieldList.foreach { field: Field[_] => setComparator(field.id, field.ord) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
//CTuple's have unknown arity so we have to put them into a Tuple1 in the middle phase:
mapReduceMap(fd) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => oldVal } { result => result._1 }
}
def head(f: Symbol*): Self = head(f -> f)
def head(f: Symbol*): Self = head(fields(f) -> fields(f))

def last(fd: (Fields, Fields)) = {
//CTuple's have unknown arity so we have to put them into a Tuple1 in the middle phase:
mapReduceMap(fd) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => newVal } { result => result._1 }
}
def last(f: Symbol*): Self = last(f -> f)
def last(f: Symbol*): Self = last(fields(f) -> fields(f))

/**
* Collect all the values into a List[T] and then operate on that
Expand Down Expand Up @@ -239,9 +239,9 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
mapReduceMap(fieldDef) { ctuple: CTuple => Tuple1(ctuple) } { (oldVal, newVal) => if (select(oldVal._1, newVal._1)) oldVal else newVal } { result => result._1 }
}
def max(fieldDef: (Fields, Fields)) = extremum(true, fieldDef)
def max(f: Symbol*) = extremum(true, (f -> f))
def max(f: Symbol*) = extremum(true, (fields(f) -> fields(f)))
def min(fieldDef: (Fields, Fields)) = extremum(false, fieldDef)
def min(f: Symbol*) = extremum(false, (f -> f))
def min(f: Symbol*) = extremum(false, (fields(f) -> fields(f)))

/**
* Similar to the scala.collection.Iterable.mkString
Expand Down Expand Up @@ -288,7 +288,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
//Same as reduce(f->f)
def reduce[T](fieldDef: Symbol*)(fn: (T, T) => T)(implicit setter: TupleSetter[T],
conv: TupleConverter[T]): Self = {
reduce(fieldDef -> fieldDef)(fn)(setter, conv)
reduce(fields(fieldDef) -> fields(fieldDef))(fn)(setter, conv)
}

// Abstract algebra reductions (sum, times, dot):
Expand All @@ -309,7 +309,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* Assumed to be a commutative operation. If you don't want that, use .forceToReducers
*/
def sum[T](fs: Symbol*)(implicit sg: Semigroup[T], tconv: TupleConverter[T], tset: TupleSetter[T]): Self =
sum[T](fs -> fs)(sg, tconv, tset)
sum[T](fields(fs) -> fields(fs))(sg, tconv, tset)

/**
* Returns the product of all the items in this grouping
Expand All @@ -324,7 +324,7 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* The same as `times(fs -> fs)`
*/
def times[T](fs: Symbol*)(implicit ring: Ring[T], tconv: TupleConverter[T], tset: TupleSetter[T]): Self = {
times[T](fs -> fs)(ring, tconv, tset)
times[T](fields(fs) -> fields(fs))(ring, tconv, tset)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ object Combinatorics {
}.filter('temp){
x: Double => if (allc.size == numWeights) (math.abs(x - result) <= error) else (x <= result)
}.discard('temp), allc)
})._1.unique(allColumns)
})._1.unique(fields(allColumns))

(1 to numWeights).zip(weights).foldLeft(res) ((a, b) => {
val (num, wt) = b
Expand All @@ -218,7 +218,7 @@ object Combinatorics {
def positiveWeightedSum(weights: IndexedSeq[Double], result: Double, error: Double)(implicit flowDef: FlowDef, mode: Mode): Pipe = {
val allColumns = (1 to weights.size).map(x => Symbol("k" + x))
weightedSum(weights, result, error)
.filter(allColumns) { x: TupleEntry =>
.filter(fields(allColumns)) { x: TupleEntry =>
(0 until allColumns.size).forall { i => x.getDouble(java.lang.Integer.valueOf(i)) != 0.0 }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import cascading.tuple.Fields

import org.scalatest.{ Matchers, WordSpec }

import DeprecatedFieldConversions._

class FieldImpsTest extends WordSpec with Matchers with FieldConversions {
def setAndCheck[T <: Comparable[_]](v: T)(implicit conv: (T) => Fields): Unit = {
conv(v) shouldBe (new Fields(v))
Expand Down