Skip to content

Commit b51c25e

Browse files
committed
Type independent gens for TypedPipe/Execution
1 parent 428b550 commit b51c25e

File tree

7 files changed

+458
-4
lines changed

7 files changed

+458
-4
lines changed

scalding-core/src/test/scala/com/twitter/scalding/ExecutionOptimizationRulesTest.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import cascading.tuple.{Fields, Tuple}
88
import com.stripe.dagon.{Dag, Rule}
99
import com.twitter.maple.tap.MemorySourceTap
1010
import com.twitter.scalding.typed.TypedPipeGen
11+
import com.twitter.scalding.typed.gen
12+
import com.twitter.scalding.typed.gen.{ExecutionGen, TypeGen, TypeWith}
1113
import java.io.{InputStream, OutputStream}
1214
import java.util.UUID
1315
import org.scalacheck.{Arbitrary, Gen}
@@ -98,9 +100,19 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks {
98100
def write(pipe: Gen[TypedPipe[Int]]): Gen[Execution[TypedPipe[Int]]] =
99101
pipe.map(_.writeThrough(new MemorySource[Int]()))
100102

103+
def write(pipe: Gen[TypedPipe[TypeWith[TypeGen]#Type]], withT: TypeWith[TypeGen]): Gen[Execution[TypedPipe[TypeWith[TypeGen]#Type]]] = {
104+
implicit val tc = withT.evidence.tupleConverter
105+
pipe.map(_.writeThrough(new MemorySource()))
106+
}
107+
101108
val mappedOrFlatMapped =
102109
Gen.oneOf(mapped(pipe), flatMapped(pipe))
103110

111+
val stdZippedWrites =
112+
Gen.zip(gen.TypedPipeGen.pipeOfStd, gen.TypedPipeGen.pipeOfStd).flatMap { case ((tl, l), (tr, r)) =>
113+
zipped(write(l, tl), write(r, tr))
114+
}
115+
104116
val zippedWrites =
105117
zipped(write(TypedPipeGen.genWithIterableSources), write(TypedPipeGen.genWithIterableSources))
106118

@@ -175,15 +187,15 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks {
175187
}
176188

177189
test("randomly generated executions trees are invertible") {
178-
forAll(genExec) { exec =>
190+
forAll(ExecutionGen.executionOfStd) { exec =>
179191
invert(exec)
180192
}
181193
}
182194

183195
test("optimization rules are reproducible") {
184196
implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 500)
185197

186-
forAll(genExec, genRule) { (exec, rule) =>
198+
forAll(ExecutionGen.executionOfStd, genRule) { (exec, rule) =>
187199
val optimized = ExecutionOptimizationRules.apply(exec, rule)
188200
val optimized2 = ExecutionOptimizationRules.apply(exec, rule)
189201
assert(optimized == optimized2)
@@ -193,7 +205,7 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks {
193205
test("standard rules are reproducible") {
194206
implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 500)
195207

196-
forAll(genExec) { exec =>
208+
forAll(ExecutionGen.executionOfStd) { exec =>
197209
val optimized = ExecutionOptimizationRules.stdOptimizations(exec)
198210
val optimized2 = ExecutionOptimizationRules.stdOptimizations(exec)
199211
assert(optimized == optimized2)
@@ -217,7 +229,7 @@ class ExecutionOptimizationRulesTest extends FunSuite with PropertyChecks {
217229
}
218230

219231
test("zip of writes merged") {
220-
forAll(zippedWrites) { e =>
232+
forAll(stdZippedWrites) { e =>
221233
val opt = ExecutionOptimizationRules.apply(e, ZipWrite)
222234

223235
assert(e.isInstanceOf[Execution.Zipped[_, _]])
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.twitter.scalding.typed.gen
2+
3+
import com.twitter.scalding.Execution
4+
import com.twitter.scalding.typed.TypedPipe
5+
import org.scalacheck.{Cogen, Gen}
6+
7+
object ExecutionGen {
8+
import TypedPipeGen._
9+
10+
private[this] def cogen(t: TypeWith[TypeGen]): Cogen[TypedPipe[t.Type]] =
11+
Cogen[TypedPipe[t.Type]] { pipe: TypedPipe[t.Type] =>
12+
pipe.hashCode.toLong
13+
}
14+
15+
def executionOfStd: Gen[Execution[TypedPipe[TypeWith[TypeGen]#Type]]] =
16+
TypeGen.std.flatMap { t =>
17+
executionOf(t)(TypeGen.std)
18+
}
19+
20+
def executionOf(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] =
21+
Gen.delay(
22+
Gen.frequency(
23+
5 -> genFrom(a),
24+
1 -> genMap(a),
25+
1 -> genFlatMap(a),
26+
1 -> tg.flatMap { t =>
27+
Gen.oneOf(
28+
genZipped(a, t).map(_.map(_._1)),
29+
genZipped(t, a).map(_.map(_._2))
30+
)
31+
}
32+
)
33+
)
34+
35+
def genFrom(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] =
36+
pipeOf(a).map(Execution.from(_))
37+
38+
def genMap(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] =
39+
tg.flatMap { t =>
40+
executionOf(t).flatMap { exec =>
41+
Gen.function1(pipeOf(a))(cogen(t)).map { f =>
42+
exec.map(f)
43+
}
44+
}
45+
}
46+
47+
def genFlatMap(a: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[TypedPipe[a.Type]]] =
48+
tg.flatMap { t =>
49+
executionOf(t).flatMap { exec =>
50+
Gen.function1(executionOf(a))(cogen(t)).map { f =>
51+
exec.flatMap(f)
52+
}
53+
}
54+
}
55+
56+
def genZipped(l: TypeWith[TypeGen], r: TypeWith[TypeGen])(implicit tg: Gen[TypeWith[TypeGen]]): Gen[Execution[(TypedPipe[l.Type], TypedPipe[r.Type])]] =
57+
for {
58+
le <- executionOf(l)
59+
re <- executionOf(r)
60+
} yield le.zip(re)
61+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.twitter.scalding.typed.gen
2+
3+
import org.scalacheck.Gen
4+
import org.scalacheck.Gen._
5+
6+
object StdGen {
7+
implicit def num[A](implicit n: Numeric[A], c: Choose[A]): Gen[A] =
8+
Gen.sized(s => c.choose(n.zero, n.max(n.fromInt(s), n.zero)))
9+
10+
implicit val stringGen: Gen[String] = Gen.alphaStr
11+
12+
implicit val charGen: Gen[Char] = Gen.alphaChar
13+
14+
implicit val booleanGen: Gen[Boolean] = Gen.oneOf(true, false)
15+
16+
implicit val unitGen: Gen[Unit] = Gen.const(())
17+
18+
implicit val byteGen: Gen[Byte] = num[Byte]
19+
20+
implicit val shortGen: Gen[Short] = num[Short]
21+
22+
implicit val intGen: Gen[Int] = num[Int]
23+
24+
implicit val longGen: Gen[Long] = num[Long]
25+
26+
implicit val floatGen: Gen[Float] = num[Float]
27+
28+
implicit val doubleGen: Gen[Double] = num[Double]
29+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.twitter.scalding.typed.gen
2+
3+
import com.twitter.algebird.Semigroup
4+
5+
object StdSemigroup {
6+
implicit val byteGroup: Semigroup[Byte] = Semigroup.from { case (l, r) =>
7+
implicitly[Numeric[Byte]].plus(l, r)
8+
}
9+
10+
implicit val charGroup: Semigroup[Char] = Semigroup.from { case (l, r) =>
11+
implicitly[Numeric[Char]].plus(l, r)
12+
}
13+
14+
implicit val stringGroup: Semigroup[String] = Semigroup.from { case (l, r) =>
15+
l + r
16+
}
17+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.twitter.scalding.typed.gen
2+
3+
4+
import com.twitter.algebird.Semigroup
5+
import com.twitter.scalding.TupleConverter
6+
import org.scalacheck.{Cogen, Gen}
7+
8+
trait TypeGen[A] {
9+
val gen: Gen[A]
10+
val cogen: Cogen[A]
11+
val ordering: Ordering[A]
12+
val semigroup: Semigroup[A]
13+
val tupleConverter: TupleConverter[A]
14+
}
15+
16+
object TypeGen {
17+
import StdGen._
18+
import StdSemigroup._
19+
20+
def apply[A](g: Gen[A], c: Cogen[A], o: Ordering[A], s: Semigroup[A], t: TupleConverter[A]): TypeGen[A] =
21+
new TypeGen[A] {
22+
val gen: Gen[A] = g
23+
val cogen: Cogen[A] = c
24+
val ordering: Ordering[A] = o
25+
val semigroup: Semigroup[A] = s
26+
val tupleConverter: TupleConverter[A] = t
27+
}
28+
29+
def apply[A, B](a: TypeGen[A], b: TypeGen[B]): TypeGen[(A, B)] =
30+
new TypeGen[(A, B)] {
31+
val gen: Gen[(A, B)] = Gen.zip(a.gen, b.gen)
32+
val cogen: Cogen[(A, B)] = Cogen.tuple2(a.cogen, b.cogen)
33+
val ordering: Ordering[(A, B)] = Ordering.Tuple2(a.ordering, b.ordering)
34+
val semigroup: Semigroup[(A, B)] = Semigroup.semigroup2(a.semigroup, b.semigroup)
35+
val tupleConverter: TupleConverter[(A, B)] =
36+
TupleConverter.build(a.tupleConverter.arity + b.tupleConverter.arity) { te =>
37+
val ta = a.tupleConverter.apply(te)
38+
val tb = b.tupleConverter.apply(te)
39+
(ta, tb)
40+
}
41+
}
42+
43+
implicit def typeGen[A: Gen: Cogen: Ordering: Semigroup: TupleConverter]: TypeGen[A] =
44+
TypeGen(implicitly, implicitly, implicitly, implicitly, implicitly)
45+
46+
implicit val std: Gen[TypeWith[TypeGen]] =
47+
Gen.oneOf(
48+
TypeWith[Unit, TypeGen],
49+
TypeWith[Boolean, TypeGen],
50+
TypeWith[Byte, TypeGen],
51+
TypeWith[Char, TypeGen],
52+
TypeWith[Short, TypeGen],
53+
TypeWith[Int, TypeGen],
54+
TypeWith[Long, TypeGen],
55+
TypeWith[Float, TypeGen],
56+
TypeWith[Double, TypeGen],
57+
TypeWith[String, TypeGen]
58+
)
59+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.twitter.scalding.typed.gen
2+
3+
sealed abstract class TypeWith[+Ev[_]] {
4+
type Type
5+
def evidence: Ev[Type]
6+
}
7+
8+
object TypeWith {
9+
type Aux[A, Ev[_]] = TypeWith[Ev] { type Type = A }
10+
11+
def apply[A, Ev[_]](implicit eva: Ev[A]): Aux[A, Ev] =
12+
new TypeWith[Ev] {
13+
type Type = A
14+
def evidence: Ev[Type] = eva
15+
}
16+
}

0 commit comments

Comments
 (0)