Skip to content

Commit a4882d6

Browse files
committed
Add temporary code for test
1 parent 8a09cca commit a4882d6

File tree

9 files changed

+150
-8
lines changed

9 files changed

+150
-8
lines changed

presto-hive/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@
172172
<scope>test</scope>
173173
</dependency>
174174

175+
<dependency>
176+
<groupId>com.facebook.presto</groupId>
177+
<artifactId>presto-native-tvf</artifactId>
178+
<scope>test</scope>
179+
</dependency>
180+
175181
<dependency>
176182
<groupId>com.google.guava</groupId>
177183
<artifactId>guava</artifactId>

presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.facebook.presto.tests.tpcds.TpcdsTableName;
3737
import com.facebook.presto.tpcds.TpcdsPlugin;
3838
import com.facebook.presto.tpch.TpchPlugin;
39+
import com.facebook.presto.tvf.TvfPlugin;
3940
import com.google.common.collect.ImmutableList;
4041
import com.google.common.collect.ImmutableMap;
4142
import com.google.common.collect.ImmutableSet;
@@ -244,6 +245,9 @@ public static DistributedQueryRunner createQueryRunner(
244245

245246
queryRunner.installPlugin(new HivePlugin(HIVE_CATALOG, Optional.of(metastore)));
246247

248+
queryRunner.installCoordinatorPlugin(new TvfPlugin());
249+
queryRunner.loadTVFProvider("system");
250+
247251
if (addJmxPlugin) {
248252
queryRunner.installPlugin(new JmxPlugin());
249253
queryRunner.createCatalog("jmx", "jmx");

presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@
135135
import com.facebook.presto.sql.planner.iterative.rule.RewriteCaseExpressionPredicate;
136136
import com.facebook.presto.sql.planner.iterative.rule.RewriteCaseToMap;
137137
import com.facebook.presto.sql.planner.iterative.rule.RewriteConstantArrayContainsToInExpression;
138-
import com.facebook.presto.sql.planner.iterative.rule.RewriteExcludeColumnsFunctionToProjection;
139138
import com.facebook.presto.sql.planner.iterative.rule.RewriteFilterWithExternalFunctionToProject;
140139
import com.facebook.presto.sql.planner.iterative.rule.RewriteSpatialPartitioningAggregation;
141140
import com.facebook.presto.sql.planner.iterative.rule.RewriteTableFunctionToTableScan;
@@ -870,7 +869,8 @@ public PlanOptimizers(
870869
ruleStats,
871870
statsCalculator,
872871
costCalculator,
873-
ImmutableSet.of(new RewriteTableFunctionToTableScan(metadata), new RewriteExcludeColumnsFunctionToProjection())));
872+
ImmutableSet.of(new RewriteTableFunctionToTableScan(metadata))));
873+
//ImmutableSet.of(new RewriteTableFunctionToTableScan(metadata), new RewriteExcludeColumnsFunctionToProjection())));
874874

875875
if (!noExchange) {
876876
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges

presto-native-execution/presto_cpp/main/tvf/tests/PlanBuilder.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ addTvfNode(
7474
// This can't be directly used like this. The TableFunction is planned
7575
// with a single join across all tables, so this doesn't translate the
7676
// same way.
77-
analysis->requiredColumns(),
77+
std::vector<column_index_t>{},
78+
// analysis->requiredColumns(),
7879
sources);
7980
};
8081
}

presto-native-execution/presto_cpp/main/types/FunctionMetadata.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,10 +524,12 @@ json getAnalyzedTableValueFunction(
524524
std::vector<velox::TypePtr> fieldTypes;
525525
for (auto& arg : descriptorArgument->descriptor->fields) {
526526
fieldNames.push_back(boost::algorithm::to_lower_copy(*arg.name));
527-
fieldTypes.push_back(parser.parse(*arg.type));
527+
// fieldTypes.push_back(parser.parse(*arg.type));
528528
}
529529
functionArg = std::make_shared<tvf::Descriptor>(
530-
std::move(fieldNames), std::move(fieldTypes));
530+
std::move(fieldNames)
531+
// , std::move(fieldTypes)
532+
);
531533
} else {
532534
VELOX_UNSUPPORTED("Failed to convert to a valid Argument");
533535
}

presto-native-tvf/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@
6565
<scope>provided</scope>
6666
</dependency>
6767

68+
<dependency>
69+
<groupId>com.fasterxml.jackson.core</groupId>
70+
<artifactId>jackson-databind</artifactId>
71+
</dependency>
72+
6873
<dependency>
6974
<groupId>com.facebook.airlift</groupId>
7075
<artifactId>json</artifactId>
@@ -90,5 +95,9 @@
9095
<groupId>com.facebook.airlift</groupId>
9196
<artifactId>http-client</artifactId>
9297
</dependency>
98+
<dependency>
99+
<groupId>com.facebook.presto</groupId>
100+
<artifactId>presto-main-base</artifactId>
101+
</dependency>
93102
</dependencies>
94103
</project>

presto-native-tvf/src/main/java/com/facebook/presto/tvf/NativeConnectorTableFunction.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
import com.facebook.airlift.http.client.HttpClient;
1717
import com.facebook.airlift.http.client.Request;
1818
import com.facebook.airlift.json.JsonCodec;
19+
import com.facebook.airlift.json.JsonCodecFactory;
20+
import com.facebook.airlift.json.JsonObjectMapperProvider;
21+
import com.facebook.presto.block.BlockJsonSerde.Serializer;
1922
import com.facebook.presto.common.QualifiedObjectName;
23+
import com.facebook.presto.common.block.Block;
24+
import com.facebook.presto.common.block.BlockEncodingManager;
2025
import com.facebook.presto.common.type.TypeManager;
2126
import com.facebook.presto.spi.ConnectorSession;
2227
import com.facebook.presto.spi.NodeManager;
@@ -27,6 +32,9 @@
2732
import com.facebook.presto.spi.function.table.ArgumentSpecification;
2833
import com.facebook.presto.spi.function.table.ReturnTypeSpecification;
2934
import com.facebook.presto.spi.function.table.TableFunctionAnalysis;
35+
import com.fasterxml.jackson.databind.ObjectMapper;
36+
import com.fasterxml.jackson.databind.SerializationFeature;
37+
import com.google.common.collect.ImmutableMap;
3038

3139
import java.util.List;
3240
import java.util.Map;
@@ -48,11 +56,22 @@ public class NativeConnectorTableFunction
4856
private final NodeManager nodeManager;
4957
private final TypeManager typeManager;
5058
private static final String TVF_ANALYZE_ENDPOINT = "/v1/tvf/analyze";
51-
private static final JsonCodec<ConnectorTableMetadata> connectorTableMetadataJsonCodec = JsonCodec.jsonCodec(ConnectorTableMetadata.class);
59+
private static final JsonCodec<ConnectorTableMetadata> connectorTableMetadataJsonCodec;
5260
private static final JsonCodec<NativeTableFunctionAnalysis> tableFunctionAnalysisJsonCodec =
5361
JsonCodec.jsonCodec(NativeTableFunctionAnalysis.class);
5462
private final QualifiedObjectName functionName;
5563

64+
static {
65+
JsonObjectMapperProvider provider = new JsonObjectMapperProvider();
66+
provider.setJsonSerializers(ImmutableMap.of(
67+
Block.class, new Serializer(new BlockEncodingManager())));
68+
69+
ObjectMapper mapper = provider.get();
70+
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
71+
JsonCodecFactory codecFactory = new JsonCodecFactory(provider);
72+
connectorTableMetadataJsonCodec = codecFactory.jsonCodec(ConnectorTableMetadata.class);
73+
}
74+
5675
public NativeConnectorTableFunction(
5776
@ForWorkerInfo HttpClient httpClient,
5877
NodeManager nodeManager,

presto-native-tvf/src/main/java/com/facebook/presto/tvf/NativeTVFProvider.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,22 @@
1717
import com.facebook.airlift.http.client.HttpUriBuilder;
1818
import com.facebook.airlift.http.client.Request;
1919
import com.facebook.airlift.json.JsonCodec;
20+
import com.facebook.airlift.json.JsonCodecFactory;
21+
import com.facebook.airlift.json.JsonObjectMapperProvider;
22+
import com.facebook.presto.common.type.Type;
2023
import com.facebook.presto.common.type.TypeManager;
2124
import com.facebook.presto.spi.Node;
2225
import com.facebook.presto.spi.NodeManager;
2326
import com.facebook.presto.spi.PrestoException;
2427
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
2528
import com.facebook.presto.spi.tvf.TVFProvider;
29+
import com.fasterxml.jackson.databind.DeserializationContext;
30+
import com.fasterxml.jackson.databind.ObjectMapper;
31+
import com.fasterxml.jackson.databind.SerializationFeature;
32+
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
2633
import com.google.common.base.Suppliers;
2734
import com.google.common.collect.ImmutableList;
35+
import com.google.common.collect.ImmutableMap;
2836
import com.google.common.collect.Iterables;
2937

3038
import javax.inject.Inject;
@@ -39,6 +47,7 @@
3947

4048
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
4149
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
50+
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
4251
import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
4352
import static java.util.Objects.requireNonNull;
4453

@@ -49,8 +58,7 @@ public class NativeTVFProvider
4958
private final TypeManager typeManager;
5059
private final HttpClient httpClient;
5160
private static final String TABLE_FUNCTIONS_ENDPOINT = "/v1/functions/tvf";
52-
private static final JsonCodec<Map<String, JsonBasedTableFunctionMetadata>> connectorTableFunctionListJsonCodec =
53-
JsonCodec.mapJsonCodec(String.class, JsonBasedTableFunctionMetadata.class);
61+
private final JsonCodec<Map<String, JsonBasedTableFunctionMetadata>> connectorTableFunctionListJsonCodec;
5462
private final Supplier<List<ConnectorTableFunction>> memoizedTableFunctionsSupplier;
5563

5664
@Inject
@@ -64,6 +72,16 @@ public NativeTVFProvider(
6472
this.httpClient = requireNonNull(httpClient, "httpClient is null");
6573
this.memoizedTableFunctionsSupplier = Suppliers.memoizeWithExpiration(this::loadConnectorTableFunctions,
6674
100000, TimeUnit.MILLISECONDS);
75+
76+
JsonObjectMapperProvider provider = new JsonObjectMapperProvider();
77+
78+
provider.setJsonDeserializers(ImmutableMap.of(
79+
Type.class, new TypeDeserializer(typeManager)));
80+
81+
ObjectMapper mapper = provider.get();
82+
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
83+
JsonCodecFactory codecFactory = new JsonCodecFactory(provider);
84+
this.connectorTableFunctionListJsonCodec = codecFactory.mapJsonCodec(String.class, JsonBasedTableFunctionMetadata.class);
6785
}
6886

6987
@Override
@@ -108,4 +126,23 @@ private synchronized NativeConnectorTableFunction createNativeConnectorTableFunc
108126
connectorTableFunction.getArguments(),
109127
connectorTableFunction.getReturnTypeSpecification());
110128
}
129+
130+
public static final class TypeDeserializer
131+
extends FromStringDeserializer<Type>
132+
{
133+
private final TypeManager typeManager;
134+
135+
@Inject
136+
public TypeDeserializer(TypeManager typeManager)
137+
{
138+
super(Type.class);
139+
this.typeManager = requireNonNull(typeManager, "typeManager is null");
140+
}
141+
142+
@Override
143+
protected Type _deserialize(String value, DeserializationContext context)
144+
{
145+
return typeManager.getType(parseTypeSignature(value));
146+
}
147+
}
111148
}

presto-native-tvf/src/main/java/com/facebook/presto/tvf/NativeTableFunctionHandle.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,44 @@
1313
*/
1414
package com.facebook.presto.tvf;
1515

16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.airlift.json.JsonCodecFactory;
1618
import com.facebook.presto.common.QualifiedObjectName;
19+
import com.facebook.presto.index.IndexHandleJacksonModule;
20+
import com.facebook.presto.metadata.ColumnHandleJacksonModule;
21+
import com.facebook.presto.metadata.DeleteTableHandleJacksonModule;
22+
import com.facebook.presto.metadata.FunctionAndTypeManager;
23+
import com.facebook.presto.metadata.FunctionHandleJacksonModule;
24+
import com.facebook.presto.metadata.HandleResolver;
25+
import com.facebook.presto.metadata.InsertTableHandleJacksonModule;
26+
import com.facebook.presto.metadata.OutputTableHandleJacksonModule;
27+
import com.facebook.presto.metadata.PartitioningHandleJacksonModule;
28+
import com.facebook.presto.metadata.SplitJacksonModule;
29+
import com.facebook.presto.metadata.TableFunctionJacksonHandleModule;
30+
import com.facebook.presto.metadata.TableHandleJacksonModule;
31+
import com.facebook.presto.metadata.TableLayoutHandleJacksonModule;
32+
import com.facebook.presto.metadata.TransactionHandleJacksonModule;
33+
import com.facebook.presto.spi.ConnectorSession;
34+
import com.facebook.presto.spi.ConnectorSplitSource;
35+
import com.facebook.presto.spi.FixedSplitSource;
36+
import com.facebook.presto.spi.NodeManager;
37+
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
1738
import com.facebook.presto.spi.function.TableFunctionHandleResolver;
1839
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
1940
import com.fasterxml.jackson.annotation.JsonCreator;
2041
import com.fasterxml.jackson.annotation.JsonProperty;
42+
import com.fasterxml.jackson.databind.ObjectMapper;
2143
import com.google.common.collect.ImmutableSet;
2244

2345
import java.util.Set;
2446

47+
import static com.facebook.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
48+
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
49+
import static com.facebook.airlift.http.client.Request.Builder.preparePost;
50+
import static com.facebook.presto.tvf.NativeTVFProvider.getWorkerLocation;
51+
import static com.google.common.net.HttpHeaders.ACCEPT;
52+
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
53+
import static com.google.common.net.MediaType.JSON_UTF_8;
2554
import static java.util.Objects.requireNonNull;
2655

2756
public class NativeTableFunctionHandle
@@ -62,4 +91,39 @@ public Set<Class<? extends ConnectorTableFunctionHandle>> getTableFunctionHandle
6291
return ImmutableSet.of(NativeTableFunctionHandle.class);
6392
}
6493
}
94+
95+
@Override
96+
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, NodeManager nodeManager, Object functionAndTypeManager)
97+
{
98+
if (functionAndTypeManager instanceof FunctionAndTypeManager) {
99+
ObjectMapper objectMapper = new ObjectMapper();
100+
HandleResolver handleResolver = ((FunctionAndTypeManager) functionAndTypeManager).getHandleResolver();
101+
objectMapper.registerModule(new TableHandleJacksonModule(handleResolver));
102+
objectMapper.registerModule(new TableLayoutHandleJacksonModule(handleResolver));
103+
objectMapper.registerModule(new ColumnHandleJacksonModule(handleResolver));
104+
objectMapper.registerModule(new SplitJacksonModule(handleResolver));
105+
objectMapper.registerModule(new OutputTableHandleJacksonModule(handleResolver));
106+
objectMapper.registerModule(new InsertTableHandleJacksonModule(handleResolver));
107+
objectMapper.registerModule(new DeleteTableHandleJacksonModule(handleResolver));
108+
objectMapper.registerModule(new IndexHandleJacksonModule(handleResolver));
109+
objectMapper.registerModule(new TransactionHandleJacksonModule(handleResolver));
110+
objectMapper.registerModule(new PartitioningHandleJacksonModule(handleResolver));
111+
objectMapper.registerModule(new FunctionHandleJacksonModule(handleResolver));
112+
objectMapper.registerModule(new TableFunctionJacksonHandleModule(handleResolver));
113+
JsonCodecFactory jsonCodecFactory = new JsonCodecFactory(() -> objectMapper);
114+
JsonCodec<ConnectorTableFunctionHandle> nativeTableFunctionHandleCodec = jsonCodecFactory.jsonCodec(ConnectorTableFunctionHandle.class);
115+
116+
return new FixedSplitSource(
117+
HttpClientHolder.getHttpClient().execute(
118+
preparePost()
119+
.setUri(getWorkerLocation(nodeManager, TVF_SPLITS_ENDPOINT))
120+
.setBodyGenerator(jsonBodyGenerator(nativeTableFunctionHandleCodec, this))
121+
.setHeader(CONTENT_TYPE, JSON_UTF_8.toString())
122+
.setHeader(ACCEPT, JSON_UTF_8.toString())
123+
.build(),
124+
createJsonResponseHandler(JsonCodec.listJsonCodec(NativeTableFunctionSplit.class))));
125+
}
126+
127+
throw new UnsupportedOperationException();
128+
}
65129
}

0 commit comments

Comments
 (0)