Skip to content

Commit d8c5809

Browse files
pdabre12xin-zhang2
authored andcommitted
Return fully qualified functionName and cleanup table function resolution logic
1 parent 7f701fa commit d8c5809

File tree

16 files changed

+303
-328
lines changed

16 files changed

+303
-328
lines changed

presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInTypeAndFunctionNamespaceManager.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,6 @@
236236
import com.facebook.presto.spi.function.SqlFunctionVisibility;
237237
import com.facebook.presto.spi.function.SqlInvokedFunction;
238238
import com.facebook.presto.spi.function.SqlInvokedScalarFunctionImplementation;
239-
import com.facebook.presto.spi.function.table.TableFunctionMetadata;
240-
import com.facebook.presto.spi.tvf.TVFProvider;
241239
import com.facebook.presto.sql.analyzer.FunctionsConfig;
242240
import com.facebook.presto.type.BigintOperators;
243241
import com.facebook.presto.type.BooleanOperators;
@@ -530,7 +528,7 @@
530528

531529
@ThreadSafe
532530
public class BuiltInTypeAndFunctionNamespaceManager
533-
implements FunctionNamespaceManager<SqlFunction>, TVFProvider, TypeManager
531+
implements FunctionNamespaceManager<SqlFunction>, TypeManager
534532
{
535533
public static final CatalogSchemaName JAVA_BUILTIN_NAMESPACE = new CatalogSchemaName("presto", "default");
536534
public static final String ID = "builtin";
@@ -1400,12 +1398,6 @@ public SpecializedFunctionKey doGetSpecializedFunctionKeyForMagicLiteralFunction
14001398
throw new PrestoException(FUNCTION_IMPLEMENTATION_MISSING, format("%s not found", signature));
14011399
}
14021400

1403-
@Override
1404-
public TableFunctionMetadata resolveTableFunction(String functionName)
1405-
{
1406-
return null;
1407-
}
1408-
14091401
private static class EmptyTransactionHandle
14101402
implements FunctionNamespaceTransactionHandle
14111403
{

presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import com.facebook.presto.spi.StandardErrorCode;
4040
import com.facebook.presto.spi.function.AggregationFunctionImplementation;
4141
import com.facebook.presto.spi.function.AlterRoutineCharacteristics;
42-
import com.facebook.presto.spi.function.CatalogSchemaFunctionName;
4342
import com.facebook.presto.spi.function.FunctionHandle;
4443
import com.facebook.presto.spi.function.FunctionMetadata;
4544
import com.facebook.presto.spi.function.FunctionMetadataManager;
@@ -108,7 +107,6 @@
108107
import static com.facebook.presto.metadata.FunctionSignatureMatcher.constructFunctionNotFoundErrorMessage;
109108
import static com.facebook.presto.metadata.SessionFunctionHandle.SESSION_NAMESPACE;
110109
import static com.facebook.presto.metadata.SignatureBinder.applyBoundVariables;
111-
import static com.facebook.presto.metadata.TableFunctionRegistry.toPath;
112110
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
113111
import static com.facebook.presto.spi.StandardErrorCode.AMBIGUOUS_FUNCTION_CALL;
114112
import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_MISSING;
@@ -167,7 +165,6 @@ public class FunctionAndTypeManager
167165
private final ConcurrentHashMap<ConnectorId, Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider>> tableFunctionProcessorProviderMap = new ConcurrentHashMap<>();
168166
private final Map<String, TVFProviderFactory> tvfProviderFactories = new ConcurrentHashMap<>();
169167
private final Map<ConnectorId, TVFProvider> tvfProviders = new ConcurrentHashMap<>();
170-
private final AtomicReference<TVFProvider> servingTableFunctionsProvider;
171168

172169
@Inject
173170
public FunctionAndTypeManager(
@@ -204,7 +201,6 @@ public FunctionAndTypeManager(
204201
this.servingTypeManager = new AtomicReference<>(builtInTypeAndFunctionNamespaceManager);
205202
this.servingTypeManagerParametricTypesSupplier = new AtomicReference<>(this::getServingTypeManagerParametricTypes);
206203
this.builtInPluginFunctionNamespaceManager = new BuiltInPluginFunctionNamespaceManager(this);
207-
this.servingTableFunctionsProvider = new AtomicReference<>(builtInTypeAndFunctionNamespaceManager);
208204
}
209205

210206
public static FunctionAndTypeManager createTestFunctionAndTypeManager()
@@ -373,12 +369,11 @@ public void loadTVFProvider(String tvfProviderName, NodeManager nodeManager)
373369
requireNonNull(tvfProviderName, "tvfProviderName is null");
374370
TVFProviderFactory factory = tvfProviderFactories.get(tvfProviderName);
375371
checkState(factory != null, "No factory for tvf provider %s", tvfProviderName);
376-
TVFProvider tvfProvider = factory.createTVFProvider(tvfProviderName, ImmutableMap.of(), new TVFProviderContext(nodeManager, this));
372+
TVFProvider tvfProvider = factory.createTVFProvider(ImmutableMap.of(), new TVFProviderContext(nodeManager, this));
377373

378374
if (tvfProviders.putIfAbsent(new ConnectorId(tvfProviderName), tvfProvider) != null) {
379375
throw new IllegalArgumentException(format("TVF provider [%s] is already registered", tvfProvider));
380376
}
381-
servingTableFunctionsProvider.compareAndSet(servingTableFunctionsProvider.get(), tvfProvider);
382377
}
383378

384379
public void loadTVFProviders(NodeManager nodeManager)
@@ -498,19 +493,19 @@ public void addTypeManagerFactory(TypeManagerFactory factory)
498493

499494
public TableFunctionMetadata resolveTableFunction(Session session, QualifiedName qualifiedName)
500495
{
501-
// Fetch if it's a builtin function first
502-
TableFunctionMetadata tableFunctionMetadata =
503-
servingTableFunctionsProvider.get()
504-
.resolveTableFunction(
505-
getFunctionAndTypeResolver()
506-
.qualifyObjectName(qualifiedName).getObjectName());
507-
if (tableFunctionMetadata == null) {
508-
// populate the registry before trying to resolve the table functions from table functions provider
509-
List<CatalogSchemaFunctionName> name = toPath(session, qualifiedName);
510-
ConnectorId connectorId = new ConnectorId(name.get(0).getCatalogName());
511-
return tableFunctionRegistry.resolve(connectorId, name.get(0));
496+
// Before resolving the table function, add all the TVF provider's table functions to the function registry.
497+
if (!tableFunctionRegistry.areTvfProviderFunctionsLoaded()) {
498+
for (ConnectorId connectorId : tvfProviders.keySet()) {
499+
// In terms of the NativeTVFProvider, you want it to act similarly to the system connector table functions, hence we replace the Java loaded system connector table functions.
500+
// This is only enforced when native execution is enabled and presto-native-tvf module is loaded.
501+
if (connectorId.getCatalogName().equals("system")) {
502+
tableFunctionRegistry.removeTableFunctions(connectorId);
503+
}
504+
tableFunctionRegistry.addTableFunctions(connectorId, tvfProviders.get(connectorId).getTableFunctions());
505+
}
506+
tableFunctionRegistry.updateTvfProviderFunctionsLoaded();
512507
}
513-
return tableFunctionMetadata;
508+
return tableFunctionRegistry.resolve(session, qualifiedName);
514509
}
515510

516511
public TransactionManager getTransactionManager()

presto-main-base/src/main/java/com/facebook/presto/metadata/TableFunctionRegistry.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,13 @@
3636
import java.util.Map;
3737
import java.util.Set;
3838
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3940

40-
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_USER_ERROR;
4141
import static com.facebook.presto.spi.StandardErrorCode.MISSING_CATALOG_NAME;
4242
import static com.facebook.presto.spi.function.table.Preconditions.checkArgument;
4343
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.CATALOG_NOT_SPECIFIED;
4444
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED;
4545
import static com.google.common.base.Preconditions.checkState;
46-
import static java.lang.String.format;
4746
import static java.util.Locale.ENGLISH;
4847
import static java.util.Objects.requireNonNull;
4948

@@ -52,6 +51,7 @@ public class TableFunctionRegistry
5251
{
5352
// catalog name in the original case; schema and function name in lowercase
5453
private final Map<ConnectorId, Map<SchemaFunctionName, TableFunctionMetadata>> tableFunctions = new ConcurrentHashMap<>();
54+
private final AtomicBoolean tvfProviderFunctionsLoaded = new AtomicBoolean(false);
5555

5656
public void addTableFunctions(ConnectorId catalogName, Collection<ConnectorTableFunction> functions)
5757
{
@@ -77,9 +77,14 @@ public void removeTableFunctions(ConnectorId catalogName)
7777
tableFunctions.remove(catalogName);
7878
}
7979

80-
public boolean areTableFunctionsLoaded(ConnectorId catalogName)
80+
public boolean areTvfProviderFunctionsLoaded()
8181
{
82-
return tableFunctions.containsKey(catalogName);
82+
return tvfProviderFunctionsLoaded.get();
83+
}
84+
85+
public void updateTvfProviderFunctionsLoaded()
86+
{
87+
tvfProviderFunctionsLoaded.compareAndSet(false, true);
8388
}
8489

8590
public static List<CatalogSchemaFunctionName> toPath(Session session, QualifiedName name)
@@ -117,19 +122,22 @@ public static List<CatalogSchemaFunctionName> toPath(Session session, QualifiedN
117122
* Resolve table function with given qualified name.
118123
* Table functions are resolved case-insensitive for consistency with existing scalar function resolution.
119124
*/
120-
public TableFunctionMetadata resolve(ConnectorId connectorId, CatalogSchemaFunctionName name)
125+
public TableFunctionMetadata resolve(Session session, QualifiedName qualifiedName)
121126
{
122-
Map<SchemaFunctionName, TableFunctionMetadata> catalogFunctions = tableFunctions.get(connectorId);
123-
if (catalogFunctions != null) {
124-
String lowercasedSchemaName = name.getSchemaFunctionName().getSchemaName().toLowerCase(ENGLISH);
125-
String lowercasedFunctionName = name.getSchemaFunctionName().getFunctionName().toLowerCase(ENGLISH);
126-
TableFunctionMetadata function = catalogFunctions.get(new SchemaFunctionName(lowercasedSchemaName, lowercasedFunctionName));
127-
if (function != null) {
128-
return function;
127+
for (CatalogSchemaFunctionName name : toPath(session, qualifiedName)) {
128+
ConnectorId connectorId = new ConnectorId(name.getCatalogName());
129+
Map<SchemaFunctionName, TableFunctionMetadata> catalogFunctions = tableFunctions.get(connectorId);
130+
if (catalogFunctions != null) {
131+
String lowercasedSchemaName = name.getSchemaFunctionName().getSchemaName().toLowerCase(ENGLISH);
132+
String lowercasedFunctionName = name.getSchemaFunctionName().getFunctionName().toLowerCase(ENGLISH);
133+
TableFunctionMetadata function = catalogFunctions.get(new SchemaFunctionName(lowercasedSchemaName, lowercasedFunctionName));
134+
if (function != null) {
135+
return function;
136+
}
129137
}
130138
}
131139

132-
throw new PrestoException(GENERIC_USER_ERROR, format("Table functions for catalog %s could not be resolved.", connectorId.getCatalogName()));
140+
return null;
133141
}
134142

135143
private static void validateTableFunction(ConnectorTableFunction tableFunction)

presto-native-execution/presto_cpp/main/types/FunctionMetadata.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -423,19 +423,18 @@ json getTableValuedFunctionsMetadata() {
423423
// Get metadata for all registered table valued functions in velox.
424424
const auto signatures = tableFunctions();
425425
for (const auto& entry : signatures) {
426+
// Just a check to see if it's a fully qualified name
426427
const auto parts = getFunctionNameParts(entry.first);
427-
const auto schema = parts[1];
428428
const auto functionName = parts[2];
429429

430-
protocol::AbstractConnectorTableFunction function;
430+
protocol::JsonBasedTableFunctionMetadata jsonBasedTableFunctionMetadata;
431431
json tj;
432-
function.name = functionName;
433-
function.schema = schema;
434-
function.returnTypeSpecification =
432+
jsonBasedTableFunctionMetadata.functionName = entry.first;
433+
jsonBasedTableFunctionMetadata.returnTypeSpecification =
435434
buildReturnTypeSpecification(getTableFunctionReturnType(entry.first));
436-
function.arguments =
435+
jsonBasedTableFunctionMetadata.arguments =
437436
buildArgumentSpecsList(getTableFunctionArgumentSpecs(entry.first));
438-
protocol::to_json(tj, function);
437+
protocol::to_json(tj, jsonBasedTableFunctionMetadata);
439438
j[functionName] = tj;
440439
}
441440
return j;
@@ -456,9 +455,10 @@ getRequiredColumns(const tvf::TableFunctionAnalysis* tableFunctionAnalysis) {
456455
}
457456

458457
protocol::NativeTableFunctionHandle buildNativeTableFunctionHandle(
459-
const TableFunctionHandlePtr tableFunctionHandle) {
458+
const TableFunctionHandlePtr tableFunctionHandle,
459+
std::string functionName) {
460460
protocol::NativeTableFunctionHandle handle;
461-
handle.functionName = tableFunctionHandle->name();
461+
handle.functionName = functionName;
462462
handle.serializedTableFunctionHandle =
463463
folly::toJson(tableFunctionHandle->serialize());
464464
return handle;
@@ -475,7 +475,7 @@ protocol::NativeTableFunctionAnalysis getNativeTableFunctionAnalysis(
475475
std::make_shared<protocol::NativeDescriptor>(
476476
buildNativeDescriptor(*tableFunctionAnalysis->returnType()));
477477
nativeTableFunctionAnalysis.handle = buildNativeTableFunctionHandle(
478-
tableFunctionAnalysis->tableFunctionHandle());
478+
tableFunctionAnalysis->tableFunctionHandle(), functionName);
479479
return nativeTableFunctionAnalysis;
480480
}
481481

@@ -484,7 +484,7 @@ json getAnalyzedTableValueFunction(
484484
velox::memory::MemoryPool* pool) {
485485
TypeParser parser;
486486
VeloxExprConverter exprConverter_{pool, &parser};
487-
protocol::ConnectorTableMetadata1 connectorTableMetadata =
487+
protocol::ConnectorTableMetadata connectorTableMetadata =
488488
json::parse(connectorTableMetadataJson);
489489
std::unordered_map<std::string, std::shared_ptr<tvf::Argument>> args;
490490
for (const auto& entry : connectorTableMetadata.arguments) {
@@ -527,11 +527,11 @@ json getAnalyzedTableValueFunction(
527527
functionArg = std::make_shared<tvf::Descriptor>(
528528
std::move(fieldNames), std::move(fieldTypes));
529529
} else {
530-
VELOX_FAIL("Failed to convert to a valid Argument");
530+
VELOX_UNSUPPORTED("Failed to convert to a valid Argument");
531531
}
532532
args[entry.first] = functionArg;
533533
}
534-
return json(
535-
getNativeTableFunctionAnalysis(connectorTableMetadata.name, args));
534+
return json(getNativeTableFunctionAnalysis(
535+
connectorTableMetadata.functionName, args));
536536
}
537537
} // namespace facebook::presto

0 commit comments

Comments
 (0)