From ffdf5827f0eca4d7951719f4054be20a545bae2f Mon Sep 17 00:00:00 2001 From: Makarand Milind Hinge Date: Wed, 11 Mar 2026 22:55:54 +0530 Subject: [PATCH 1/5] fix(benchmark): Remove hardcoded paths in Grep.java - Replace hardcoded input path with configurable argument - Replace hardcoded output path 'lala.out' with configurable argument - Add default output path as .out if not specified - Add usage message and argument validation - Addresses issue #418 --- .../java/org/apache/wayang/apps/grep/Grep.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/grep/Grep.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/grep/Grep.java index 9ece8ab8c..c647af403 100644 --- a/wayang-benchmark/src/main/java/org/apache/wayang/apps/grep/Grep.java +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/grep/Grep.java @@ -98,11 +98,19 @@ public static void wayangFlink(String input, String output){ } public static void main(String... args) throws Exception { + if (args.length < 3) { + System.err.println("Usage: [output-file]"); + System.err.println(" size: dataset size indicator"); + System.err.println(" platform: so|pure-java|pure-spark|pure-flink|wayang-java|wayang-spark|wayang-flink"); + System.err.println(" input-file: full path to input file"); + System.err.println(" output-file: full path to output file (optional, defaults to .out)"); + System.exit(1); + } + int size = Integer.parseInt(args[0]); String platform = args[1]; - - String input = args[2]+"/python/src/pywy/tests/resources/10e"+size+"MB.input"; - String output = args[2]+"/lala.out"; + String input = args[2]; + String output = args.length > 3 ? args[3] : args[2] + ".out"; String[] command = {"rm", "-r", output}; Process process = Runtime.getRuntime().exec(command); From f8d7ff6de24a3c3f5569a79c0c1fd5a339dfd11c Mon Sep 17 00:00:00 2001 From: Makarand Milind Hinge Date: Wed, 11 Mar 2026 23:10:50 +0530 Subject: [PATCH 2/5] cleanup: Remove unused imports and add Configuration in Main.java --- .../java/org/apache/wayang/apps/wordcount/Main.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Main.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Main.java index 389c5ee46..a811af7d6 100644 --- a/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Main.java +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Main.java @@ -18,24 +18,17 @@ package org.apache.wayang.apps.wordcount; +import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; -import org.apache.wayang.core.plan.wayangplan.WayangPlan; -import org.apache.wayang.core.util.ReflectionUtils; import org.apache.wayang.java.Java; -import org.apache.wayang.java.platform.JavaPlatform; import org.apache.wayang.spark.Spark; -import org.apache.wayang.api.JavaPlanBuilder; - import java.io.IOException; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.OptionalDouble; public class Main { @@ -46,7 +39,7 @@ public static void main(String[] args) throws IOException, URISyntaxException { System.exit(1); } - WayangContext wayangContext = new WayangContext(); + WayangContext wayangContext = new WayangContext(new Configuration()); for (String platform : args[0].split(",")) { switch (platform) { case "java": From 8749e6632ec8e2831463c6071c2014afa5502096 Mon Sep 17 00:00:00 2001 From: Makarand Milind Hinge Date: Wed, 11 Mar 2026 23:14:36 +0530 Subject: [PATCH 3/5] cleanup: Remove unused import and add Configuration in WordCountParquet.java --- .../org/apache/wayang/apps/wordcount/WordCountParquet.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountParquet.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountParquet.java index fd9b03404..f7a53785d 100644 --- a/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountParquet.java +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountParquet.java @@ -20,7 +20,7 @@ import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; -import org.apache.wayang.basic.operators.ParquetSource; +import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.java.Java; import org.apache.wayang.spark.Spark; @@ -37,7 +37,7 @@ public static void main(String[] args){ System.exit(1); } - WayangContext wayangContext = new WayangContext(); + WayangContext wayangContext = new WayangContext(new Configuration()); for (String platform : args[0].split(",")) { switch (platform) { case "java": From eb6ee693b9bf061e204da02a85946560504589b3 Mon Sep 17 00:00:00 2001 From: Makarand Milind Hinge Date: Wed, 11 Mar 2026 23:30:34 +0530 Subject: [PATCH 4/5] cleanup: Remove commented code in WordCount.java --- .../main/java/org/apache/wayang/apps/wordcount/WordCount.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCount.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCount.java index e2997a588..f32b382ba 100644 --- a/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCount.java +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCount.java @@ -42,7 +42,6 @@ public static void main(String[] args){ WayangContext wayangContext = new WayangContext(new Configuration()) .withPlugin(Java.basicPlugin()) .withPlugin(Spark.basicPlugin()); - // .withPlugin(Flink.basicPlugin()); JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) .withJobName("WordCount") @@ -55,7 +54,6 @@ public static void main(String[] args){ /* Split each line by non-word characters */ .flatMap(line -> Arrays.asList(line.split("\\W+"))) - // .withSelectivity(1, 100, 0.9) .withName("Split words") /* Filter empty tokens */ From bf7a70861d0c7672fa8109a903e73ddc792cd963 Mon Sep 17 00:00:00 2001 From: Makarand Milind Hinge Date: Thu, 12 Mar 2026 18:11:12 +0530 Subject: [PATCH 5/5] feat: Add JavaPlanBuilder benchmark versions Addresses #418 --- .../apps/tpch/TPCHQ1WithPlanBuilder.java | 168 ++++++++++++++++++ .../wordcount/WordCountWithPlanBuilder.java | 116 ++++++++++++ 2 files changed, 284 insertions(+) create mode 100644 wayang-benchmark/src/main/java/org/apache/wayang/apps/tpch/TPCHQ1WithPlanBuilder.java create mode 100644 wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountWithPlanBuilder.java diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/tpch/TPCHQ1WithPlanBuilder.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/tpch/TPCHQ1WithPlanBuilder.java new file mode 100644 index 000000000..bde0c8858 --- /dev/null +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/tpch/TPCHQ1WithPlanBuilder.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.apps.tpch; + +import org.apache.wayang.api.JavaPlanBuilder; +import org.apache.wayang.apps.tpch.data.LineItemTuple; +import org.apache.wayang.apps.tpch.data.q1.GroupKey; +import org.apache.wayang.apps.tpch.data.q1.ReturnTuple; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.java.Java; +import org.apache.wayang.spark.Spark; + +import java.util.Collection; + +/** + * TPC-H Query 1 implementation using JavaPlanBuilder API. + * This is the modern, fluent API version. Compare with {@link TPCHQ1WithJavaNative} + * to see the differences between the native operator API and the JavaPlanBuilder API. + */ +public class TPCHQ1WithPlanBuilder { + + /** + * Executes TPC-H Query 1, which is as follows: + *
+     * select
+     *  l_returnflag,
+     *  l_linestatus,
+     *  sum(l_quantity) as sum_qty,
+     *  sum(l_extendedprice) as sum_base_price,
+     *  sum(l_extendedprice*(1-l_discount)) as sum_disc_price,
+     *  sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,
+     *  avg(l_quantity) as avg_qty,
+     *  avg(l_extendedprice) as avg_price,
+     *  avg(l_discount) as avg_disc,
+     *  count(*) as count_order
+     * from
+     *  lineitem
+     * where
+     *  l_shipdate <= date '1998-12-01' - interval '[DELTA]' day (3)
+     * group by
+     *  l_returnflag,
+     *  l_linestatus
+     * order by
+     *  l_returnflag,
+     *  l_linestatus;
+     * 
+ * + * @param wayangContext the Wayang context + * @param lineItemUrl URL to the lineitem CSV file + * @param delta the {@code [DELTA]} parameter + * @return Collection of query results + */ + private static Collection executeQ1(WayangContext wayangContext, String lineItemUrl, final int delta) { + final int maxShipdate = LineItemTuple.Parser.parseDate("1998-12-01") - delta; + + JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) + .withJobName("TPC-H Q1") + .withUdfJarOf(TPCHQ1WithPlanBuilder.class); + + return planBuilder + // Read the lineitem table + .readTextFile(lineItemUrl).withName("Load lineitem") + + // Parse the rows + .map(line -> new LineItemTuple.Parser().parse(line)) + .withName("Parse lineitem") + + // Filter by shipdate + .filter(tuple -> tuple.L_SHIPDATE <= maxShipdate) + .withName("Filter by shipdate") + + // Project the queried attributes + .map(lineItemTuple -> new ReturnTuple( + lineItemTuple.L_RETURNFLAG, + lineItemTuple.L_LINESTATUS, + lineItemTuple.L_QUANTITY, + lineItemTuple.L_EXTENDEDPRICE, + lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT), + lineItemTuple.L_EXTENDEDPRICE * (1 - lineItemTuple.L_DISCOUNT) * (1 + lineItemTuple.L_TAX), + lineItemTuple.L_QUANTITY, + lineItemTuple.L_EXTENDEDPRICE, + lineItemTuple.L_DISCOUNT, + 1)) + .withName("Project attributes") + + // Aggregation: group by returnflag and linestatus + .reduceByKey( + returnTuple -> new GroupKey(returnTuple.L_RETURNFLAG, returnTuple.L_LINESTATUS), + (t1, t2) -> { + t1.SUM_QTY += t2.SUM_QTY; + t1.SUM_BASE_PRICE += t2.SUM_BASE_PRICE; + t1.SUM_DISC_PRICE += t2.SUM_DISC_PRICE; + t1.SUM_CHARGE += t2.SUM_CHARGE; + t1.AVG_QTY += t2.AVG_QTY; + t1.AVG_PRICE += t2.AVG_PRICE; + t1.AVG_DISC += t2.AVG_DISC; + t1.COUNT_ORDER += t2.COUNT_ORDER; + return t1; + }) + .withName("Aggregate") + + // Finalize AVG operations + .map(t -> { + t.AVG_QTY /= t.COUNT_ORDER; + t.AVG_PRICE /= t.COUNT_ORDER; + t.AVG_DISC /= t.COUNT_ORDER; + return t; + }) + .withName("Finalize aggregation") + + // Execute and collect results + .collect(); + } + + public static void main(String[] args) { + if (args.length == 0) { + System.err.print("Usage: [,]* *"); + System.exit(1); + } + + WayangContext wayangContext = new WayangContext(new Configuration()); + for (String platform : args[0].split(",")) { + switch (platform) { + case "java": + wayangContext.register(Java.basicPlugin()); + break; + case "spark": + wayangContext.register(Spark.basicPlugin()); + break; + default: + System.err.format("Unknown platform: \"%s\"\n", platform); + System.exit(3); + return; + } + } + + Collection results; + switch (Integer.parseInt(args[1])) { + case 1: + results = executeQ1(wayangContext, args[2], Integer.parseInt(args[3])); + break; + default: + System.err.println("Unsupported query number."); + System.exit(2); + return; + } + + // Print results + results.forEach(System.out::println); + } +} diff --git a/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountWithPlanBuilder.java b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountWithPlanBuilder.java new file mode 100644 index 000000000..bd13976aa --- /dev/null +++ b/wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCountWithPlanBuilder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.apps.wordcount; + +import org.apache.wayang.api.JavaPlanBuilder; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.flink.Flink; +import org.apache.wayang.java.Java; +import org.apache.wayang.spark.Spark; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collection; + +/** + * Example Apache Wayang (incubating) App that does a word count using JavaPlanBuilder API. + * This is the modern, fluent API version. Compare with {@link WordCountWithJavaNativeAPI} + * to see the differences between the native operator API and the JavaPlanBuilder API. + */ +public class WordCountWithPlanBuilder { + + public static void main(String[] args) throws IOException, URISyntaxException { + try { + if (args.length == 0) { + System.err.print("Usage: [,]* "); + System.exit(1); + } + + /* Initialize WayangContext and register platforms */ + WayangContext wayangContext = new WayangContext(new Configuration()); + for (String platform : args[0].split(",")) { + switch (platform) { + case "java": + wayangContext.register(Java.basicPlugin()); + break; + case "spark": + wayangContext.register(Spark.basicPlugin()); + break; + case "flink": + wayangContext.register(Flink.basicPlugin()); + break; + default: + System.err.format("Unknown platform: \"%s\"\n", platform); + System.exit(3); + return; + } + } + + /* Get a plan builder */ + JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) + .withJobName("WordCount") + .withUdfJarOf(WordCountWithPlanBuilder.class); + + /* Build and execute the Apache Wayang Plan using fluent API */ + Collection> wordcounts = planBuilder + /* Read the text file */ + .readTextFile(args[1]).withName("Load file") + + /* Split each line by non-word characters */ + .flatMap(line -> Arrays.asList(line.split("\\W+"))) + .withSelectivity(1, 100, 0.8) + .withName("Split words") + + /* Filter empty tokens */ + .filter(token -> !token.isEmpty()) + .withName("Filter empty words") + + /* Attach counter to each word */ + .map(word -> new Tuple2<>(word.toLowerCase(), 1)) + .withName("To lower case, add counter") + + /* Sum up counters for every word */ + .reduceByKey( + Tuple2::getField0, + (a, b) -> { + a.field1 += b.field1; + return a; + } + ) + .withName("Add counters") + + /* Execute the plan and collect the results */ + .collect(); + + /* Display results */ + wordcounts.stream() + .sorted((t1, t2) -> Integer.compare(t2.field1, t1.field1)) + .forEach(wc -> System.out.printf("%dx %s\n", wc.field1, wc.field0)); + System.out.printf("Found %d words:\n", wordcounts.size()); + + } catch (Exception e) { + System.err.println("App failed."); + e.printStackTrace(); + System.exit(4); + } + } +}