Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void execute(final ExecutionStage stage, final OptimizationContext optimi
final SqlQueryChannel.Instance queryChannel = pair.field1;

queryChannel.setSqlQuery(query);

// Return the tipChannelInstance.
executionState.register(queryChannel);
}
Expand All @@ -99,10 +98,35 @@ public void execute(final ExecutionStage stage, final OptimizationContext optimi
* @return the said follow-up {@link ExecutionTask} or {@code null} if none
*/
private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) {
assert task.getNumOuputChannels() == 1;

if (task.getNumOuputChannels() != 1) {
throw new WayangException(
"JdbcExecutor expected exactly one output channel but found "
+ task.getNumOuputChannels() + " for task: " + task
);
}

final Channel outputChannel = task.getOutputChannel(0);
final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer
final Collection<ExecutionTask> consumers = outputChannel.getConsumers();

if (consumers == null || consumers.isEmpty()) {
throw new WayangException(
"JdbcExecutor expected exactly one consumer for output channel but found none: "
+ outputChannel
);
}

if (consumers.size() > 1) {
throw new WayangException(
"JdbcExecutor expected exactly one consumer but found multiple: "
+ consumers
);
}

final ExecutionTask consumer = consumers.iterator().next();

return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator
? consumer
: null;
}

Expand Down Expand Up @@ -142,11 +166,12 @@ private static SqlQueryChannel.Instance instantiateOutboundChannel(final Executi
private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task,
final OptimizationContext optimizationContext,
final SqlQueryChannel.Instance predecessorChannelInstance, final JdbcExecutor jdbcExecutor) {
final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor);
final SqlQueryChannel.Instance newInstance =
JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor);
newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage());
return newInstance;
}

/**
* Creates a query channel and the sql statement
*
Expand All @@ -158,69 +183,84 @@ protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(final E
final OptimizationContext context, final JdbcExecutor jdbcExecutor) {
final Collection<?> startTasks = stage.getStartTasks();
final Collection<?> termTasks = stage.getTerminalTasks();

// Verify that we can handle this instance.
assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported";
final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0];
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";
final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
assert startTask.getOperator() instanceof TableSource
: "Invalid JDBC stage: Start task has to be a TableSource";

// Extract the different types of ExecutionOperators from the stage.
final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator();
SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);
SqlQueryChannel.Instance tipChannelInstance =
JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor);

final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
JdbcProjectionOperator projectionTask = null;
final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();

final Set<ExecutionTask> allTasks = stage.getAllTasks();
assert allTasks.size() <= 3;

ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);

while (nextTask != null) {
// Evaluate the nextTask.
if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) {
filterTasks.add(filterOperator);
} else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) {
assert projectionTask == null; // Allow one projection operator per stage for now.
assert projectionTask == null;
projectionTask = projectionOperator;
} else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) {
joinTasks.add(joinOperator);
} else {
throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString()));
throw new WayangException(
String.format("Unsupported JDBC execution task %s", nextTask.toString()));
}

// Move the tipChannelInstance.
tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor);

tipChannelInstance =
JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor);
// Go to the next nextTask.
nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage);
}

// Create the SQL query.
final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);
final StringBuilder query =
createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);

return new Tuple2<>(query.toString(), tipChannelInstance);
}

public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp,
final Collection<JdbcFilterOperator> filterTasks, JdbcProjectionOperator projectionTask,
public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor,
final JdbcTableSource tableOp,
final Collection<JdbcFilterOperator> filterTasks,
JdbcProjectionOperator projectionTask,
final Collection<JdbcJoinOperator<?>> joinTasks) {
final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

final String tableName =
tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

final Collection<String> conditions = filterTasks.stream()
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
.collect(Collectors.toList());
final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

final String projection =
projectionTask == null ? "*" :
projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

final Collection<String> joins = joinTasks.stream()
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
.collect(Collectors.toList());

final StringBuilder sb = new StringBuilder(1000);

sb.append("SELECT ").append(projection).append(" FROM ").append(tableName);

if (!joins.isEmpty()) {
final String separator = " ";
for (final String join : joins) {
sb.append(separator).append(join);
sb.append(" ").append(join);
}
}

if (!conditions.isEmpty()) {
sb.append(" WHERE ");
String separator = "";
Expand All @@ -229,7 +269,9 @@ public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, fin
separator = " AND ";
}
}

sb.append(';');

return sb;
}

Expand All @@ -250,24 +292,32 @@ public Platform getPlatform() {
private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs)
throws IOException, SQLException {
// Output results.
final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();
try (final OutputStreamWriter writer = new OutputStreamWriter(
outFs.create(outputFileChannelInstance.getSinglePath()))) {
final FileSystem outFs =
FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();

try (final OutputStreamWriter writer =
new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) {

while (rs.next()) {
// System.out.println(rs.getInt(1) + " " + rs.getString(2));
final ResultSetMetaData rsmd = rs.getMetaData();

for (int i = 1; i <= rsmd.getColumnCount(); i++) {

writer.write(rs.getString(i));

if (i < rsmd.getColumnCount()) {
writer.write('\t');
}
}

if (!rs.isLast()) {
writer.write('\n');
}
}

} catch (final UncheckedIOException e) {
throw e.getCause();
}
}
}
}
Loading