(🚧🚜 WIP 👷🚧 ) Split fabric-specific code into individual back-ends#1617
(🚧🚜 WIP 👷🚧 ) Split fabric-specific code into individual back-ends#1617cchepelov wants to merge 44 commits intotwitter:cascading3from
Conversation
- create 4 empty fabric jars - break & split Mode across the "storage" and "execution tech" axis - intentionally retire some of the Mode-related type names to ensure things DO move - fix what breaks at compile time. TESTS NOT YET RUN AT THIS POINT
… toxic to hadoop-commons 2.6.0)
…terns across fabrics)
…ster-test is requested
oscar-stripe
left a comment
There was a problem hiding this comment.
Great work.
One question: is there any way we can reduce the size of the PR.
For instance, can we keep flink out and just do the existing fabrics? That will make is easier to review. Once we get it in, we can add more.
I like this approach and think it is the most sensible way to get the most out of cascading 3 without breaking users.
| import org.slf4j.LoggerFactory | ||
|
|
||
| import scala.annotation.meta.param | ||
| import scala.collection.{ Map, mutable } |
There was a problem hiding this comment.
can we avoid scala.collection.Map and explicitly use Map for immutable map (standard) and mutable.Map when we need a mutable one?
| * but needs a Mode inside. | ||
| */ | ||
| private class ArgsWithMode(argsMap: Map[String, List[String]], val mode: Mode) extends Args(argsMap) { | ||
| private class ArgsWithMode(argsMap: scala.Predef.Map[String, List[String]], val mode: Mode) extends Args(argsMap) { |
There was a problem hiding this comment.
I'd rather not hide Map with an import or use scala.collection.immutable.Map here.
| Hdfs(strictSources, config) | ||
| } else | ||
| throw ArgsException("[ERROR] Mode must be one of --local, --hadoop1, --hadoop2-mr1, --hadoop2-tez or --hdfs, you provided none") | ||
| lazy val autoMode = if (args.boolean("autoCluster")) { |
There was a problem hiding this comment.
can we prefix new scalding options with scalding.?
| "hdfs" -> "com.twitter.scalding.LegacyHadoopMode", | ||
| "flink" -> "com.twitter.scalding.FlinkMode") | ||
|
|
||
| val KnownTestModesMap = Seq( |
| "flink-test" -> "com.twitter.scalding.FlinkTestMode") | ||
| // TODO: define hadoop2-mr1 (easy), tez and flink (less easy) classes. | ||
|
|
||
| private def getModeConstructor[M <: Mode](clazzName: String, types: Class[_]*) = |
There was a problem hiding this comment.
I wonder if we should just have a contract that a Mode needs a constructor that takes exactly one Args and exactly one Configuration, or even, possibly Config which is immutable. Wouldn't that work? I don't want to add too much flexibility if we don't need it.
| trait ClusterMode extends Mode { | ||
| } | ||
|
|
||
| trait HadoopMode extends Mode { |
There was a problem hiding this comment.
we have user code that assumes this. Can we find a way not to break those folks?
| * The "HadoopMode" is actually an alias for "a mode running on a fabric that ultimately runs using an execution | ||
| * engine compatible with some of the Hadoop technology stack (may or may not include Hadoop 1.x, YARN, etc.) | ||
| */ | ||
| trait HadoopExecutionModeBase[ConfigType <: Configuration] |
There was a problem hiding this comment.
maybe we actually want an abstract type here:
trait HadoopExecutionModeBase {
type ConfigType <: Configuration
}since then we can refer to it: mode.ConfigType. We might do this to put more of the cascading types and not use _ so much.
| */ | ||
| trait HadoopExecutionModeBase[ConfigType <: Configuration] | ||
| extends ExecutionMode { | ||
| def jobConf: Configuration |
There was a problem hiding this comment.
should this not return ConfigType?
|
|
||
| val memo = scala.collection.mutable.Map[String, Constructor[CounterImpl]]() | ||
| val ctor = memo.synchronized { | ||
| memo.getOrElse(klassName, |
There was a problem hiding this comment.
let's use getOrElseUpdate rather than the nested put.
| } | ||
|
|
||
| private[scalding] def upcast[T <: FlowProcess[_]](fp: FlowProcess[_])(implicit ev: TypeTag[T]): T = fp match { | ||
| case hfp: T @unchecked if (ev == typeTag[T]) => hfp // see below |
There was a problem hiding this comment.
this typeTag[T] will always match ev no? I think so. Why not just explicitly cast in this method:
def downCast[T <: U](u: U): T = u.asInstanceOf[T]Also, this is a downcast, no?
|
Cyrille Chépélov (TP12) seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
(as being discussed on scalding-dev)
In the things I've left out for now (in addition to the list in the e-mail): scalding-hadoop-test (platform-specific tests on a real minicluster).
Perhaps it'd make sense to have run / runHadoop (→runFabricLocal ?) / runOnMiniCluster in JobTest, and the relevant Mode implementation assortment?