Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.twitter.scalding

import com.twitter.scalding.typed.CoGroupable
import com.twitter.scalding.typed.{TypedPipe, Input, Output, CoGroupable, KeyedList}
import org.slf4j.LoggerFactory
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe.{NullaryMethodType, RuntimeMirror, Symbol, Type, TypeRef}
Expand All @@ -13,19 +13,12 @@ object ReferencedClassFinder {
private val baseContainers = List(
classOf[Execution[_]],
classOf[TypedPipe[_]],
classOf[TypedSink[_]],
classOf[TypedSource[_]],
classOf[Input[_]],
classOf[Output[_]],
classOf[CoGroupable[_, _]],
classOf[KeyedList[_, _]]
)

/**
* Add the given type, as well as all referenced types to the cascading tokens list. note, for maximal
* efficiency, you should also register those types with the kryo instantiator being used.
*/
def addCascadingTokensFrom(c: Class[_], config: Config): Config =
CascadingTokenUpdater.update(config, findReferencedClasses(c) + c)

/**
* Reflect over a scalding job to try and identify types it uses so they can be tokenized by cascading.
* Since scala reflection is broken with the Hadoop InterfaceAudiance annotation (see
Expand All @@ -35,15 +28,22 @@ object ReferencedClassFinder {
*
* Note: this not guaranteed to find every used type. Eg, it can't find types used in a step that isn't
* referred to in a field
*
* This by default only traverses fields of outerClass which are TypedPipe or Executions
*/
def findReferencedClasses(outerClass: Class[_]): Set[Class[_]] = {
def findReferencedClasses(outerClass: Class[_]): Set[Class[_]] =
findReferencedClassesWhere(outerClass) { cls =>
baseContainers.exists(_.isAssignableFrom(cls))
}

def findReferencedClassesWhere(outerClass: Class[_])(fieldFilter: Class[_] => Boolean): Set[Class[_]] = {
val scalaPackage = Package.getPackage("scala")
val mirror = universe.runtimeMirror(outerClass.getClassLoader)
getClassType(outerClass, mirror) match {
case Some(scalaType) =>
(for {
field <- outerClass.getDeclaredFields
if baseContainers.exists(_.isAssignableFrom(field.getType))
if fieldFilter(field.getType)
scalaSignature <- getFieldType(outerClass, scalaType, field).toSeq
clazz <- getClassesForType(mirror, scalaSignature)
/* The scala root package contains a lot of shady stuff, eg compile-time wrappers (scala.Int/Array etc),
Expand Down