Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function)
{
return function.getSplits(transaction, session, nodeManager, functionAndTypeManager);
return function.getSplits(transaction, session, nodeManager);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public long step()
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, NodeManager nodeManager, Object functionAndTypeManager)
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, NodeManager nodeManager)
{
return getSequenceFunctionSplitSource(this);
}
Expand Down
10 changes: 6 additions & 4 deletions presto-native-tvf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand All @@ -91,13 +96,10 @@
<groupId>com.facebook.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main-base</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.JsonCodecFactory;
import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.presto.block.BlockJsonSerde.Serializer;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.NodeManager;
Expand All @@ -32,21 +32,29 @@
import com.facebook.presto.spi.function.table.ArgumentSpecification;
import com.facebook.presto.spi.function.table.ReturnTypeSpecification;
import com.facebook.presto.spi.function.table.TableFunctionAnalysis;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.core.Base64Variants;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static com.facebook.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.Request.Builder.preparePost;
import static com.facebook.presto.common.block.BlockSerdeUtil.writeBlock;
import static com.facebook.presto.spi.StandardErrorCode.TABLE_FUNCTION_ANALYSIS_FAILED;
import static com.facebook.presto.tvf.NativeTVFProvider.getWorkerLocation;
import static com.google.common.net.HttpHeaders.ACCEPT;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static com.google.common.net.MediaType.JSON_UTF_8;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

public class NativeConnectorTableFunction
Expand All @@ -64,10 +72,8 @@ public class NativeConnectorTableFunction
static {
JsonObjectMapperProvider provider = new JsonObjectMapperProvider();
provider.setJsonSerializers(ImmutableMap.of(
Block.class, new Serializer(new BlockEncodingManager())));
Block.class, new BlockSerializer(new BlockEncodingManager())));

ObjectMapper mapper = provider.get();
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
JsonCodecFactory codecFactory = new JsonCodecFactory(provider);
connectorTableMetadataJsonCodec = codecFactory.jsonCodec(ConnectorTableMetadata.class);
}
Expand Down Expand Up @@ -110,4 +116,26 @@ private Request getWorkerRequest(Map<String, Argument> arguments)
.setHeader(ACCEPT, JSON_UTF_8.toString())
.build();
}

private static class BlockSerializer
extends JsonSerializer<Block>
{
private final BlockEncodingSerde blockEncodingSerde;

public BlockSerializer(BlockEncodingSerde blockEncodingSerde)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
}

@Override
public void serialize(Block block, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
throws IOException
{
// Encoding name is length prefixed as are other block data encodings
SliceOutput output = new DynamicSliceOutput(toIntExact(block.getSizeInBytes() + block.getEncodingName().length() + (2 * Integer.BYTES)));
writeBlock(blockEncodingSerde, output, block);
Slice slice = output.slice();
jsonGenerator.writeBinary(Base64Variants.MIME_NO_LINEFEEDS, slice.byteArray(), slice.byteArrayOffset(), slice.length());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.tvf.TVFProvider;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -74,12 +72,9 @@ public NativeTVFProvider(
100000, TimeUnit.MILLISECONDS);

JsonObjectMapperProvider provider = new JsonObjectMapperProvider();

provider.setJsonDeserializers(ImmutableMap.of(
Type.class, new TypeDeserializer(typeManager)));

ObjectMapper mapper = provider.get();
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
JsonCodecFactory codecFactory = new JsonCodecFactory(provider);
this.connectorTableFunctionListJsonCodec = codecFactory.mapJsonCodec(String.class, JsonBasedTableFunctionMetadata.class);
}
Expand Down Expand Up @@ -127,7 +122,7 @@ private synchronized NativeConnectorTableFunction createNativeConnectorTableFunc
connectorTableFunction.getReturnTypeSpecification());
}

public static final class TypeDeserializer
private static final class TypeDeserializer
extends FromStringDeserializer<Type>
{
private final TypeManager typeManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class NativeTVFProviderFactory
implements TVFProviderFactory
{
private static final String NAME = "system";
protected static final String NAME = "system";

private static final NativeTableFunctionHandle.Resolver HANDLE_RESOLVER = new NativeTableFunctionHandle.Resolver();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,9 @@
*/
package com.facebook.presto.tvf;

import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.JsonCodecFactory;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.index.IndexHandleJacksonModule;
import com.facebook.presto.metadata.ColumnHandleJacksonModule;
import com.facebook.presto.metadata.DeleteTableHandleJacksonModule;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.FunctionHandleJacksonModule;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.metadata.InsertTableHandleJacksonModule;
import com.facebook.presto.metadata.OutputTableHandleJacksonModule;
import com.facebook.presto.metadata.PartitioningHandleJacksonModule;
import com.facebook.presto.metadata.SplitJacksonModule;
import com.facebook.presto.metadata.TableFunctionJacksonHandleModule;
import com.facebook.presto.metadata.TableHandleJacksonModule;
import com.facebook.presto.metadata.TableLayoutHandleJacksonModule;
import com.facebook.presto.metadata.TransactionHandleJacksonModule;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.FixedSplitSource;
Expand All @@ -39,14 +25,14 @@
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

import static com.facebook.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.Request.Builder.preparePost;
import static com.facebook.presto.tvf.HttpClientHolder.getHttpClient;
import static com.facebook.presto.tvf.NativeTVFProvider.getWorkerLocation;
import static com.google.common.net.HttpHeaders.ACCEPT;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
Expand Down Expand Up @@ -82,6 +68,31 @@ public QualifiedObjectName getFunctionName()
return functionName;
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, NodeManager nodeManager)
{
return new FixedSplitSource(
getHttpClient().execute(
prepareSplitsPostRequest(nodeManager, this),
createJsonResponseHandler(JsonCodec.listJsonCodec(NativeTableFunctionSplit.class))));
}

private static Request prepareSplitsPostRequest(NodeManager nodeManager, NativeTableFunctionHandle nativeTableFunctionHandle)
{
return preparePost()
.setUri(getWorkerLocation(nodeManager, TVF_SPLITS_ENDPOINT))
.setBodyGenerator(jsonBodyGenerator(
JsonCodec.jsonCodec(ManualNativeTableFunctionHandleJsonHandler.class),
new ManualNativeTableFunctionHandleJsonHandler(
// The handle resolver for TVF providers is always the factory name suffixed by a colon.
NativeTVFProviderFactory.NAME + ":" + NativeTableFunctionHandle.class.getName(),
nativeTableFunctionHandle.serializedTableFunctionHandle,
nativeTableFunctionHandle.functionName)))
.setHeader(CONTENT_TYPE, JSON_UTF_8.toString())
.setHeader(ACCEPT, JSON_UTF_8.toString())
.build();
}

public static class Resolver
implements TableFunctionHandleResolver
{
Expand All @@ -92,38 +103,40 @@ public Set<Class<? extends ConnectorTableFunctionHandle>> getTableFunctionHandle
}
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, NodeManager nodeManager, Object functionAndTypeManager)
// todo: Hacky way to manually convert a NativeTableFunctionHandle JSON
protected static class ManualNativeTableFunctionHandleJsonHandler
{
if (functionAndTypeManager instanceof FunctionAndTypeManager) {
ObjectMapper objectMapper = new ObjectMapper();
HandleResolver handleResolver = ((FunctionAndTypeManager) functionAndTypeManager).getHandleResolver();
objectMapper.registerModule(new TableHandleJacksonModule(handleResolver));
objectMapper.registerModule(new TableLayoutHandleJacksonModule(handleResolver));
objectMapper.registerModule(new ColumnHandleJacksonModule(handleResolver));
objectMapper.registerModule(new SplitJacksonModule(handleResolver));
objectMapper.registerModule(new OutputTableHandleJacksonModule(handleResolver));
objectMapper.registerModule(new InsertTableHandleJacksonModule(handleResolver));
objectMapper.registerModule(new DeleteTableHandleJacksonModule(handleResolver));
objectMapper.registerModule(new IndexHandleJacksonModule(handleResolver));
objectMapper.registerModule(new TransactionHandleJacksonModule(handleResolver));
objectMapper.registerModule(new PartitioningHandleJacksonModule(handleResolver));
objectMapper.registerModule(new FunctionHandleJacksonModule(handleResolver));
objectMapper.registerModule(new TableFunctionJacksonHandleModule(handleResolver));
JsonCodecFactory jsonCodecFactory = new JsonCodecFactory(() -> objectMapper);
JsonCodec<ConnectorTableFunctionHandle> nativeTableFunctionHandleCodec = jsonCodecFactory.jsonCodec(ConnectorTableFunctionHandle.class);

return new FixedSplitSource(
HttpClientHolder.getHttpClient().execute(
preparePost()
.setUri(getWorkerLocation(nodeManager, TVF_SPLITS_ENDPOINT))
.setBodyGenerator(jsonBodyGenerator(nativeTableFunctionHandleCodec, this))
.setHeader(CONTENT_TYPE, JSON_UTF_8.toString())
.setHeader(ACCEPT, JSON_UTF_8.toString())
.build(),
createJsonResponseHandler(JsonCodec.listJsonCodec(NativeTableFunctionSplit.class))));
private final String type;
private final String serializedTableFunctionHandle;
private final QualifiedObjectName functionName;

@JsonCreator
public ManualNativeTableFunctionHandleJsonHandler(
@JsonProperty("@type") String type,
@JsonProperty("serializedTableFunctionHandle") String serializedTableFunctionHandle,
@JsonProperty("functionName") QualifiedObjectName functionName)
{
this.type = requireNonNull(type, "type is null");
this.serializedTableFunctionHandle = requireNonNull(serializedTableFunctionHandle, "serializedTableFunctionHandle is null");
this.functionName = requireNonNull(functionName, "functionName is null");
}

throw new UnsupportedOperationException();
@JsonProperty("@type")
public String getType()
{
return type;
}

@JsonProperty
public String getSerializedTableFunctionHandle()
{
return serializedTableFunctionHandle;
}

@JsonProperty("functionName")
public QualifiedObjectName getFunctionName()
{
return functionName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public interface ConnectorTableFunctionHandle
{
default ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction,
ConnectorSession session,
NodeManager nodeManager,
Object functionAndTypeManager)
NodeManager nodeManager)
{
throw new UnsupportedOperationException();
}
Expand Down