-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Adds backlog reporting support for non-fnapi based SDF's. #38346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a8ed2f2
71ffc9a
2f06938
43222da
dcb239b
26e498e
35769fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| { | ||
| "comment": "Modify this file in a trivial way to cause this test suite to run!", | ||
| "modification": 2, | ||
| "modification": 1, | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -130,114 +130,114 @@ public Result invokeProcessElement( | |
| final Map<String, PCollectionView<?>> sideInputMapping) { | ||
| final ProcessContext processContext = new ProcessContext(element, tracker, watermarkEstimator); | ||
|
|
||
| DoFn.ProcessContinuation cont = | ||
| invoker.invokeProcessElement( | ||
| new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { | ||
| @Override | ||
| public String getErrorContext() { | ||
| return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName(); | ||
| } | ||
|
|
||
| @Override | ||
| public DoFn<InputT, OutputT>.ProcessContext processContext( | ||
| DoFn<InputT, OutputT> doFn) { | ||
| return processContext; | ||
| } | ||
|
|
||
| @Override | ||
| public Object sideInput(String tagId) { | ||
| PCollectionView<?> view = sideInputMapping.get(tagId); | ||
| if (view == null) { | ||
| throw new IllegalArgumentException("calling getSideInput() with unknown view"); | ||
| } | ||
| return processContext.sideInput(view); | ||
| } | ||
|
|
||
| @Override | ||
| public Object restriction() { | ||
| return tracker.currentRestriction(); | ||
| } | ||
|
|
||
| @Override | ||
| public InputT element(DoFn<InputT, OutputT> doFn) { | ||
| return processContext.element(); | ||
| } | ||
|
|
||
| @Override | ||
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { | ||
| return processContext.timestamp(); | ||
| } | ||
|
|
||
| @Override | ||
| public String timerId(DoFn<InputT, OutputT> doFn) { | ||
| throw new UnsupportedOperationException( | ||
| "Cannot access timerId as parameter outside of @OnTimer method."); | ||
| } | ||
|
|
||
| @Override | ||
| public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { | ||
| throw new UnsupportedOperationException( | ||
| "Access to time domain not supported in ProcessElement"); | ||
| } | ||
|
|
||
| @Override | ||
| public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { | ||
| return DoFnOutputReceivers.windowedReceiver( | ||
| processContext, OutputBuilderSuppliers.supplierForElement(element), null); | ||
| } | ||
|
|
||
| @Override | ||
| public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { | ||
| throw new UnsupportedOperationException("Not supported in SplittableDoFn"); | ||
| } | ||
|
|
||
| @Override | ||
| public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { | ||
| return DoFnOutputReceivers.windowedMultiReceiver( | ||
| processContext, OutputBuilderSuppliers.supplierForElement(element)); | ||
| } | ||
|
|
||
| @Override | ||
| public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) { | ||
| return processContext.causedByDrain(); | ||
| } | ||
|
|
||
| @Override | ||
| public RestrictionTracker<?, ?> restrictionTracker() { | ||
| return processContext.tracker; | ||
| } | ||
|
|
||
| @Override | ||
| public WatermarkEstimator<?> watermarkEstimator() { | ||
| return processContext.watermarkEstimator; | ||
| } | ||
|
|
||
| @Override | ||
| public PipelineOptions pipelineOptions() { | ||
| return pipelineOptions; | ||
| } | ||
|
|
||
| @Override | ||
| public BundleFinalizer bundleFinalizer() { | ||
| return bundleFinalizer.get(); | ||
| } | ||
|
|
||
| // Unsupported methods below. | ||
|
|
||
| @Override | ||
| public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { | ||
| throw new IllegalStateException( | ||
| "Should not access startBundleContext() from @" | ||
| + DoFn.ProcessElement.class.getSimpleName()); | ||
| } | ||
|
|
||
| @Override | ||
| public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { | ||
| throw new IllegalStateException( | ||
| "Should not access finishBundleContext() from @" | ||
| + DoFn.ProcessElement.class.getSimpleName()); | ||
| } | ||
| }); | ||
| DoFnInvoker.BaseArgumentProvider<InputT, OutputT> invokerArgumentProvider = | ||
| new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { | ||
| @Override | ||
| public String getErrorContext() { | ||
| return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName(); | ||
| } | ||
|
|
||
| @Override | ||
| public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { | ||
| return processContext; | ||
| } | ||
|
|
||
| @Override | ||
| public Object sideInput(String tagId) { | ||
| PCollectionView<?> view = sideInputMapping.get(tagId); | ||
| if (view == null) { | ||
| throw new IllegalArgumentException("calling getSideInput() with unknown view"); | ||
| } | ||
| return processContext.sideInput(view); | ||
| } | ||
|
|
||
| @Override | ||
| public Object restriction() { | ||
| return tracker.currentRestriction(); | ||
| } | ||
|
|
||
| @Override | ||
| public InputT element(DoFn<InputT, OutputT> doFn) { | ||
| return processContext.element(); | ||
| } | ||
|
|
||
| @Override | ||
| public Instant timestamp(DoFn<InputT, OutputT> doFn) { | ||
| return processContext.timestamp(); | ||
| } | ||
|
|
||
| @Override | ||
| public String timerId(DoFn<InputT, OutputT> doFn) { | ||
| throw new UnsupportedOperationException( | ||
| "Cannot access timerId as parameter outside of @OnTimer method."); | ||
| } | ||
|
|
||
| @Override | ||
| public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { | ||
| throw new UnsupportedOperationException( | ||
| "Access to time domain not supported in ProcessElement"); | ||
| } | ||
|
|
||
| @Override | ||
| public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { | ||
| return DoFnOutputReceivers.windowedReceiver( | ||
| processContext, OutputBuilderSuppliers.supplierForElement(element), null); | ||
| } | ||
|
|
||
| @Override | ||
| public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { | ||
| throw new UnsupportedOperationException("Not supported in SplittableDoFn"); | ||
| } | ||
|
|
||
| @Override | ||
| public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { | ||
| return DoFnOutputReceivers.windowedMultiReceiver( | ||
| processContext, OutputBuilderSuppliers.supplierForElement(element)); | ||
| } | ||
|
|
||
| @Override | ||
| public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) { | ||
| return processContext.causedByDrain(); | ||
| } | ||
|
|
||
| @Override | ||
| public RestrictionTracker<?, ?> restrictionTracker() { | ||
| return processContext.tracker; | ||
| } | ||
|
|
||
| @Override | ||
| public WatermarkEstimator<?> watermarkEstimator() { | ||
| return processContext.watermarkEstimator; | ||
| } | ||
|
|
||
| @Override | ||
| public PipelineOptions pipelineOptions() { | ||
| return pipelineOptions; | ||
| } | ||
|
|
||
| @Override | ||
| public BundleFinalizer bundleFinalizer() { | ||
| return bundleFinalizer.get(); | ||
| } | ||
|
|
||
| // Unsupported methods below. | ||
|
|
||
| @Override | ||
| public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { | ||
| throw new IllegalStateException( | ||
| "Should not access startBundleContext() from @" | ||
| + DoFn.ProcessElement.class.getSimpleName()); | ||
| } | ||
|
|
||
| @Override | ||
| public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { | ||
| throw new IllegalStateException( | ||
| "Should not access finishBundleContext() from @" | ||
| + DoFn.ProcessElement.class.getSimpleName()); | ||
| } | ||
| }; | ||
|
|
||
| DoFn.ProcessContinuation cont = invoker.invokeProcessElement(invokerArgumentProvider); | ||
| processContext.cancelScheduledCheckpoint(); | ||
| @Nullable | ||
| KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residual = | ||
|
|
@@ -278,8 +278,37 @@ public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { | |
| if (residual == null) { | ||
| return new Result(null, cont, null, null); | ||
| } | ||
| final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residualForGetSize = residual; | ||
| // For a list of all DoFnInvoker arguments, see DoFn.java. | ||
| double backlogBytes = | ||
| invoker.invokeGetSize( | ||
| new DoFnInvoker.DelegatingArgumentProvider<InputT, OutputT>( | ||
| invokerArgumentProvider, invokerArgumentProvider.getErrorContext() + "/GetSize") { | ||
| @Override | ||
| public Object restriction() { | ||
| return residualForGetSize.getKey(); | ||
| } | ||
|
|
||
| @Override | ||
| public RestrictionTracker<?, ?> restrictionTracker() { | ||
| return invoker.invokeNewTracker( | ||
| new DoFnInvoker.DelegatingArgumentProvider<InputT, OutputT>( | ||
| invokerArgumentProvider, | ||
| invokerArgumentProvider.getErrorContext() + "/NewTracker") { | ||
|
|
||
| @Override | ||
| public Object restriction() { | ||
| return residualForGetSize.getKey(); | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| return new Result( | ||
| residual.getKey(), cont, residual.getValue().getKey(), residual.getValue().getValue()); | ||
| residual.getKey(), | ||
| cont, | ||
| residual.getValue().getKey(), | ||
| residual.getValue().getValue(), | ||
| backlogBytes); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We expect to get this information only during finishBundle ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not during finishBundle, but rather in processElement after we've finished processing the restriction. The idea is that we want to get the backlog (work remaining) of the residual restriction after we've finished processing the bundle. I don't think you can call tryClaim or otherwise change the restriction in finishBundle (since it requires keyed state and whatnot).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we do any form of validation of the value returned here before sending it to the runner ? For example, ignore if negative or zero(wrong implementation but we probably don't want to pass that to the runner).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I decided to put the validation in the Dataflow execution context that way other runners could decide how they wanted to handle these edge cases. Thoughts? |
||
| } | ||
|
|
||
| private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about creating a util to get information from the residual instead of creating an inline class here. Probably also refactor other similar places if any.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced this with a call to DelegatingArgumentProvider instead, which allows us to reuse most of the original argument provider used for ProcessElement.