diff --git a/scalding-core/src/main/scala/com/twitter/scalding/ReferencedClassFinder.scala b/scalding-base/src/main/scala/com/twitter/scalding/ReferencedClassFinder.scala similarity index 89% rename from scalding-core/src/main/scala/com/twitter/scalding/ReferencedClassFinder.scala rename to scalding-base/src/main/scala/com/twitter/scalding/ReferencedClassFinder.scala index 0a12e516c..4c712d3a1 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/ReferencedClassFinder.scala +++ b/scalding-base/src/main/scala/com/twitter/scalding/ReferencedClassFinder.scala @@ -1,6 +1,6 @@ package com.twitter.scalding -import com.twitter.scalding.typed.CoGroupable +import com.twitter.scalding.typed.{TypedPipe, Input, Output, CoGroupable, KeyedList} import org.slf4j.LoggerFactory import scala.reflect.runtime.universe import scala.reflect.runtime.universe.{NullaryMethodType, RuntimeMirror, Symbol, Type, TypeRef} @@ -13,19 +13,12 @@ object ReferencedClassFinder { private val baseContainers = List( classOf[Execution[_]], classOf[TypedPipe[_]], - classOf[TypedSink[_]], - classOf[TypedSource[_]], + classOf[Input[_]], + classOf[Output[_]], classOf[CoGroupable[_, _]], classOf[KeyedList[_, _]] ) - /** - * Add the given type, as well as all referenced types to the cascading tokens list. note, for maximal - * efficiency, you should also register those types with the kryo instantiator being used. - */ - def addCascadingTokensFrom(c: Class[_], config: Config): Config = - CascadingTokenUpdater.update(config, findReferencedClasses(c) + c) - /** * Reflect over a scalding job to try and identify types it uses so they can be tokenized by cascading. * Since scala reflection is broken with the Hadoop InterfaceAudiance annotation (see @@ -35,15 +28,22 @@ object ReferencedClassFinder { * * Note: this not guaranteed to find every used type. Eg, it can't find types used in a step that isn't * referred to in a field + * + * This by default only traverses fields of outerClass which are TypedPipe or Executions */ - def findReferencedClasses(outerClass: Class[_]): Set[Class[_]] = { + def findReferencedClasses(outerClass: Class[_]): Set[Class[_]] = + findReferencedClassesWhere(outerClass) { cls => + baseContainers.exists(_.isAssignableFrom(cls)) + } + + def findReferencedClassesWhere(outerClass: Class[_])(fieldFilter: Class[_] => Boolean): Set[Class[_]] = { val scalaPackage = Package.getPackage("scala") val mirror = universe.runtimeMirror(outerClass.getClassLoader) getClassType(outerClass, mirror) match { case Some(scalaType) => (for { field <- outerClass.getDeclaredFields - if baseContainers.exists(_.isAssignableFrom(field.getType)) + if fieldFilter(field.getType) scalaSignature <- getFieldType(outerClass, scalaType, field).toSeq clazz <- getClassesForType(mirror, scalaSignature) /* The scala root package contains a lot of shady stuff, eg compile-time wrappers (scala.Int/Array etc),