diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index f7a9d7c5a..938010897 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -83,7 +83,6 @@ public void execute(final ExecutionStage stage, final OptimizationContext optimi final SqlQueryChannel.Instance queryChannel = pair.field1; queryChannel.setSqlQuery(query); - // Return the tipChannelInstance. executionState.register(queryChannel); } @@ -99,10 +98,35 @@ public void execute(final ExecutionStage stage, final OptimizationContext optimi * @return the said follow-up {@link ExecutionTask} or {@code null} if none */ private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) { - assert task.getNumOuputChannels() == 1; + + if (task.getNumOuputChannels() != 1) { + throw new WayangException( + "JdbcExecutor expected exactly one output channel but found " + + task.getNumOuputChannels() + " for task: " + task + ); + } + final Channel outputChannel = task.getOutputChannel(0); - final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers()); - return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer + final Collection consumers = outputChannel.getConsumers(); + + if (consumers == null || consumers.isEmpty()) { + throw new WayangException( + "JdbcExecutor expected exactly one consumer for output channel but found none: " + + outputChannel + ); + } + + if (consumers.size() > 1) { + throw new WayangException( + "JdbcExecutor expected exactly one consumer but found multiple: " + + consumers + ); + } + + final ExecutionTask consumer = consumers.iterator().next(); + + return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator + ? consumer : null; } @@ -142,11 +166,12 @@ private static SqlQueryChannel.Instance instantiateOutboundChannel(final Executi private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, final OptimizationContext optimizationContext, final SqlQueryChannel.Instance predecessorChannelInstance, final JdbcExecutor jdbcExecutor) { - final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor); + final SqlQueryChannel.Instance newInstance = + JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor); newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage()); return newInstance; } - + /** * Creates a query channel and the sql statement * @@ -158,7 +183,6 @@ protected static Tuple2 createSqlQuery(final E final OptimizationContext context, final JdbcExecutor jdbcExecutor) { final Collection startTasks = stage.getStartTasks(); final Collection termTasks = stage.getTerminalTasks(); - // Verify that we can handle this instance. assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported"; final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; @@ -166,61 +190,77 @@ protected static Tuple2 createSqlQuery(final E final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; assert startTask.getOperator() instanceof TableSource : "Invalid JDBC stage: Start task has to be a TableSource"; - // Extract the different types of ExecutionOperators from the stage. final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator(); - SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor); + SqlQueryChannel.Instance tipChannelInstance = + JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor); + final Collection filterTasks = new ArrayList<>(4); JdbcProjectionOperator projectionTask = null; final Collection> joinTasks = new ArrayList<>(); + final Set allTasks = stage.getAllTasks(); assert allTasks.size() <= 3; + ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); + while (nextTask != null) { // Evaluate the nextTask. if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) { filterTasks.add(filterOperator); } else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) { - assert projectionTask == null; // Allow one projection operator per stage for now. + assert projectionTask == null; projectionTask = projectionOperator; } else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) { joinTasks.add(joinOperator); } else { - throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); + throw new WayangException( + String.format("Unsupported JDBC execution task %s", nextTask.toString())); } - // Move the tipChannelInstance. - tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor); - + tipChannelInstance = + JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor); // Go to the next nextTask. nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage); } - // Create the SQL query. - final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); + final StringBuilder query = + createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); + return new Tuple2<>(query.toString(), tipChannelInstance); } - public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp, - final Collection filterTasks, JdbcProjectionOperator projectionTask, + public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, + final JdbcTableSource tableOp, + final Collection filterTasks, + JdbcProjectionOperator projectionTask, final Collection> joinTasks) { - final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + + final String tableName = + tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + final Collection conditions = filterTasks.stream() .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) .collect(Collectors.toList()); - final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + + final String projection = + projectionTask == null ? "*" : + projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + final Collection joins = joinTasks.stream() .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) .collect(Collectors.toList()); final StringBuilder sb = new StringBuilder(1000); + sb.append("SELECT ").append(projection).append(" FROM ").append(tableName); + if (!joins.isEmpty()) { - final String separator = " "; for (final String join : joins) { - sb.append(separator).append(join); + sb.append(" ").append(join); } } + if (!conditions.isEmpty()) { sb.append(" WHERE "); String separator = ""; @@ -229,7 +269,9 @@ public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, fin separator = " AND "; } } + sb.append(';'); + return sb; } @@ -250,24 +292,32 @@ public Platform getPlatform() { private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs) throws IOException, SQLException { // Output results. - final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); - try (final OutputStreamWriter writer = new OutputStreamWriter( - outFs.create(outputFileChannelInstance.getSinglePath()))) { + final FileSystem outFs = + FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); + + try (final OutputStreamWriter writer = + new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) { + while (rs.next()) { // System.out.println(rs.getInt(1) + " " + rs.getString(2)); final ResultSetMetaData rsmd = rs.getMetaData(); + for (int i = 1; i <= rsmd.getColumnCount(); i++) { + writer.write(rs.getString(i)); + if (i < rsmd.getColumnCount()) { writer.write('\t'); } } + if (!rs.isLast()) { writer.write('\n'); } } + } catch (final UncheckedIOException e) { throw e.getCause(); } } -} +} \ No newline at end of file