Skip to content

Commit 8a09cca

Browse files
committed
Add getSplits
1 parent 27eb1b9 commit 8a09cca

31 files changed

+377
-75
lines changed

presto-hive/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,6 @@
110110
<artifactId>presto-rcfile</artifactId>
111111
</dependency>
112112

113-
<dependency>
114-
<groupId>com.facebook.presto</groupId>
115-
<artifactId>presto-native-tvf</artifactId>
116-
</dependency>
117-
118113
<dependency>
119114
<groupId>com.facebook.presto.hadoop</groupId>
120115
<artifactId>hadoop-apache2</artifactId>

presto-main-base/src/main/java/com/facebook/presto/connector/system/GlobalSystemConnector.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.presto.common.RuntimeStats;
1717
import com.facebook.presto.common.transaction.TransactionId;
18+
import com.facebook.presto.metadata.FunctionAndTypeManager;
1819
import com.facebook.presto.operator.table.ExcludeColumns;
1920
import com.facebook.presto.operator.table.Sequence;
2021
import com.facebook.presto.operator.table.Sequence.SequenceFunctionHandle;
@@ -30,6 +31,7 @@
3031
import com.facebook.presto.spi.ConnectorTableLayoutResult;
3132
import com.facebook.presto.spi.ConnectorTableMetadata;
3233
import com.facebook.presto.spi.Constraint;
34+
import com.facebook.presto.spi.NodeManager;
3335
import com.facebook.presto.spi.SchemaTableName;
3436
import com.facebook.presto.spi.SchemaTablePrefix;
3537
import com.facebook.presto.spi.SplitContext;
@@ -54,7 +56,6 @@
5456
import java.util.Set;
5557
import java.util.function.Function;
5658

57-
import static com.facebook.presto.operator.table.Sequence.getSequenceFunctionSplitSource;
5859
import static java.util.Objects.requireNonNull;
5960

6061
public class GlobalSystemConnector
@@ -66,13 +67,17 @@ public class GlobalSystemConnector
6667
private final Set<SystemTable> systemTables;
6768
private final Set<Procedure> procedures;
6869
private final Set<ConnectorTableFunction> tableFunctions;
70+
private final NodeManager nodeManager;
71+
private final FunctionAndTypeManager functionAndTypeManager;
6972

70-
public GlobalSystemConnector(String connectorId, Set<SystemTable> systemTables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions)
73+
public GlobalSystemConnector(String connectorId, Set<SystemTable> systemTables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions, NodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager)
7174
{
7275
this.connectorId = requireNonNull(connectorId, "connectorId is null");
7376
this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
7477
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
7578
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
79+
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
80+
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
7681
}
7782

7883
@Override
@@ -160,11 +165,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
160165
@Override
161166
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function)
162167
{
163-
if (function instanceof SequenceFunctionHandle) {
164-
SequenceFunctionHandle sequenceFunctionHandle = (SequenceFunctionHandle) function;
165-
return getSequenceFunctionSplitSource(sequenceFunctionHandle);
166-
}
167-
throw new UnsupportedOperationException();
168+
return function.getSplits(transaction, session, nodeManager, functionAndTypeManager);
168169
}
169170
};
170171
}

presto-main-base/src/main/java/com/facebook/presto/connector/system/GlobalSystemConnectorFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
*/
1414
package com.facebook.presto.connector.system;
1515

16+
import com.facebook.presto.metadata.FunctionAndTypeManager;
1617
import com.facebook.presto.spi.ConnectorHandleResolver;
18+
import com.facebook.presto.spi.NodeManager;
1719
import com.facebook.presto.spi.SystemTable;
1820
import com.facebook.presto.spi.connector.Connector;
1921
import com.facebook.presto.spi.connector.ConnectorContext;
@@ -34,13 +36,17 @@ public class GlobalSystemConnectorFactory
3436
private final Set<SystemTable> tables;
3537
private final Set<Procedure> procedures;
3638
private final Set<ConnectorTableFunction> tableFunctions;
39+
private final NodeManager nodeManager;
40+
private final FunctionAndTypeManager functionAndTypeManager;
3741

3842
@Inject
39-
public GlobalSystemConnectorFactory(Set<SystemTable> tables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions)
43+
public GlobalSystemConnectorFactory(Set<SystemTable> tables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions, NodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager)
4044
{
4145
this.tables = ImmutableSet.copyOf(requireNonNull(tables, "tables is null"));
4246
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
4347
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
48+
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
49+
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
4450
}
4551

4652
@Override
@@ -58,6 +64,6 @@ public ConnectorHandleResolver getHandleResolver()
5864
@Override
5965
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
6066
{
61-
return new GlobalSystemConnector(catalogName, tables, procedures, tableFunctions);
67+
return new GlobalSystemConnector(catalogName, tables, procedures, tableFunctions, nodeManager, functionAndTypeManager);
6268
}
6369
}

presto-main-base/src/main/java/com/facebook/presto/connector/system/SystemConnectorModule.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import com.facebook.presto.connector.system.jdbc.TableTypeJdbcTable;
2828
import com.facebook.presto.connector.system.jdbc.TypesJdbcTable;
2929
import com.facebook.presto.connector.system.jdbc.UdtJdbcTable;
30+
import com.facebook.presto.metadata.FunctionAndTypeManager;
31+
import com.facebook.presto.nodeManager.PluginNodeManager;
3032
import com.facebook.presto.operator.table.ExcludeColumns;
3133
import com.facebook.presto.operator.table.Sequence;
34+
import com.facebook.presto.spi.NodeManager;
3235
import com.facebook.presto.spi.SystemTable;
3336
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
3437
import com.facebook.presto.spi.procedure.Procedure;
@@ -80,6 +83,9 @@ public void configure(Binder binder)
8083

8184
binder.bind(GlobalSystemConnectorFactory.class).in(Scopes.SINGLETON);
8285
binder.bind(SystemConnectorRegistrar.class).asEagerSingleton();
86+
binder.bind(PluginNodeManager.class).in(Scopes.SINGLETON);
87+
binder.bind(NodeManager.class).to(PluginNodeManager.class).in(Scopes.SINGLETON);
88+
binder.bind(FunctionAndTypeManager.class).in(Scopes.SINGLETON);
8389

8490
Multibinder<ConnectorTableFunction> tableFunctions = Multibinder.newSetBinder(binder, ConnectorTableFunction.class);
8591
tableFunctions.addBinding().toProvider(ExcludeColumns.class).in(Scopes.SINGLETON);

presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,12 @@ public void addTVFProviderFactory(TVFProviderFactory factory)
385385
throw new IllegalArgumentException(format("TVF provider '%s' is already registered", factory.getName()));
386386
}
387387
handleResolver.addTableFunctionNamespace(factory.getName(), factory.getTableFunctionHandleResolver());
388+
handleResolver.addTableFunctionSplitNamespace(factory.getName(), factory.getTableFunctionSplitResolver());
389+
}
390+
391+
public HandleResolver getHandleResolver()
392+
{
393+
return handleResolver;
388394
}
389395

390396
@Override

presto-main-base/src/main/java/com/facebook/presto/operator/PageBuffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
package com.facebook.presto.operator;
1515

1616
import com.facebook.presto.common.Page;
17+
import com.facebook.presto.operator.WorkProcessor.ProcessState;
1718
import jakarta.annotation.Nullable;
1819

1920
import static com.facebook.presto.operator.WorkProcessor.ProcessState.finished;
2021
import static com.facebook.presto.operator.WorkProcessor.ProcessState.ofResult;
21-
import static com.facebook.presto.operator.WorkProcessor.ProcessState.yield;
2222
import static com.google.common.base.Preconditions.checkState;
2323
import static java.util.Objects.requireNonNull;
2424

@@ -41,7 +41,7 @@ public WorkProcessor<Page> pages()
4141
return ofResult(result);
4242
}
4343

44-
return yield();
44+
return ProcessState.yield();
4545
});
4646
}
4747

presto-main-base/src/main/java/com/facebook/presto/operator/project/PageProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import static com.facebook.presto.common.block.DictionaryId.randomDictionaryId;
4444
import static com.facebook.presto.operator.WorkProcessor.ProcessState.finished;
4545
import static com.facebook.presto.operator.WorkProcessor.ProcessState.ofResult;
46-
import static com.facebook.presto.operator.WorkProcessor.ProcessState.yield;
4746
import static com.facebook.presto.operator.project.SelectedPositions.positionsRange;
4847
import static com.google.common.base.Preconditions.checkArgument;
4948
import static com.google.common.base.Verify.verify;
@@ -204,7 +203,7 @@ public ProcessState<Page> process()
204203
lastComputeYielded = true;
205204
lastComputeBatchSize = batchSize;
206205
updateRetainedSize();
207-
return yield();
206+
return ProcessState.yield();
208207
}
209208

210209
if (result.isPageTooLarge()) {

presto-main-base/src/main/java/com/facebook/presto/operator/table/Sequence.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.facebook.presto.spi.ConnectorSplitSource;
2121
import com.facebook.presto.spi.FixedSplitSource;
2222
import com.facebook.presto.spi.HostAddress;
23+
import com.facebook.presto.spi.NodeManager;
2324
import com.facebook.presto.spi.NodeProvider;
2425
import com.facebook.presto.spi.PrestoException;
2526
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
@@ -167,6 +168,12 @@ public long step()
167168
{
168169
return step;
169170
}
171+
172+
@Override
173+
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, NodeManager nodeManager, Object functionAndTypeManager)
174+
{
175+
return getSequenceFunctionSplitSource(this);
176+
}
170177
}
171178

172179
public static ConnectorSplitSource getSequenceFunctionSplitSource(SequenceFunctionHandle handle)

presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,9 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
522522
new AnalyzePropertiesSystemTable(transactionManager, metadata),
523523
new TransactionsSystemTable(metadata.getFunctionAndTypeManager(), transactionManager)),
524524
ImmutableSet.of(),
525-
ImmutableSet.of(new ExcludeColumnsFunction()));
525+
ImmutableSet.of(new ExcludeColumnsFunction()),
526+
null,
527+
getFunctionAndTypeManager());
526528

527529
BuiltInQueryAnalyzer queryAnalyzer = new BuiltInQueryAnalyzer(metadata, sqlParser, accessControl, Optional.empty(), metadataExtractorExecutor);
528530
BuiltInAnalyzerProvider analyzerProvider = new BuiltInAnalyzerProvider(queryAnalyzer);

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,18 @@ void PrestoServer::run() {
406406
util::extractMessageBody(body),
407407
server->nativeWorkerPool_.get()));
408408
});
409+
httpServer_->registerPost(
410+
"/v1/tvf/splits",
411+
[server = this](
412+
proxygen::HTTPMessage* message,
413+
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
414+
proxygen::ResponseHandler* downstream) {
415+
http::sendOkResponse(
416+
downstream,
417+
getSplits(
418+
util::extractMessageBody(body),
419+
server->nativeWorkerPool_.get()));
420+
});
409421

410422
if (systemConfig->enableRuntimeMetricsCollection()) {
411423
enableWorkerStatsReporting();

0 commit comments

Comments
 (0)