[REVIEW-ONLY] Python binding for high level apis#613
[REVIEW-ONLY] Python binding for high level apis#613patilvikram wants to merge 1 commit intoapache:masterfrom
Conversation
| @@ -21,12 +21,13 @@ | |||
| import java.io.IOException; | |||
There was a problem hiding this comment.
Why is this change here? Can you rebase?
| </executions> | ||
| </plugin> | ||
| <plugin> | ||
| <artifactId>maven-dependency-plugin</artifactId> |
There was a problem hiding this comment.
Does it need to be redefined? Can it not be inherited from the parent.
python/pom.xml
Outdated
| <groupId>org.apache.hadoop</groupId> | ||
| <artifactId>hadoop-client</artifactId> | ||
| <version>${hadoop.version}</version> | ||
| <!--<scope>provided</scope>--> |
There was a problem hiding this comment.
Why are these not provided?
python/examples/stock_filter.py
Outdated
| @@ -0,0 +1,14 @@ | |||
|
|
|||
There was a problem hiding this comment.
Can this go into standard malhar/examples?
|
There are some binary files checked in, example py4j-0.10.4.jar, py4j-0.10.4-src.zip, py4j-0.10.4.tar.gz. Are these needed? |
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.apex.malhar.python.operator; |
There was a problem hiding this comment.
Package path doesn't have stream. What is the benefit of having this class here instead of in python package.
| int pythonServerStartAttempts = 5; | ||
| while (!this.py4jListener.isPythonServerStarted() && !this.pythonWorkerProxy.isFunctionEnabled() && pythonServerStartAttempts > 0) { | ||
| try { | ||
| Thread.sleep(5000L); |
There was a problem hiding this comment.
Timeout should be configurable
| Thread.sleep(5000L); | ||
| LOG.debug("Waiting for Python Worker Registration"); | ||
| --pythonServerStartAttempts; | ||
| } catch (InterruptedException var9) { |
There was a problem hiding this comment.
Variable name.. sounds generated
| this.pythonWorkerProxy.setFunction(this.operationType.getType()); | ||
| } | ||
| if (!this.py4jListener.isPythonServerStarted()) { | ||
| LOG.error("Python server could not be started"); |
There was a problem hiding this comment.
Should this throw an exception and fail
| * @return new stream of type O | ||
| */ | ||
| <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts); | ||
| <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts); |
There was a problem hiding this comment.
Avoid formatting change if there is no other change here.
| * @param serializedFunction stores Serialized Function data | ||
| * @return new stream of type T | ||
| */ | ||
| <STREAM extends ApexStream<T>> STREAM map_func(byte[] serializedFunction, Option... opts); |
There was a problem hiding this comment.
Function name not in java style
| } | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Formatting change may be unnecessary
| foundPort = serverSocket.getLocalPort(); | ||
| serverSocket.close(); | ||
| return foundPort; | ||
| } catch (IOException e) { |
tushargosavi
left a comment
There was a problem hiding this comment.
There are few binaries files present in the pull request, need to discuss with community about which types of binary files are acceptable in repository.
|
|
||
| public class InMemoryDataInputOperator<T> implements InputOperator | ||
| { | ||
|
|
docs/python/main.md
Outdated
|
|
||
| from pyapex import createApp | ||
| a=createApp('python_app').fromKafka08('localhost:2181','test_topic') \ | ||
| .setFilter('filter_operator',f) \ |
There was a problem hiding this comment.
I would prefer functions names such as map/filter/flatmap rather than setMap, setFilter, setFlatmap.
docs/python/main.md
Outdated
|
|
||
| ``` | ||
|
|
||
| def f(a): |
There was a problem hiding this comment.
Give meaningful names to functions in documentations. how about filterCondition?
| @Override | ||
| public void beginWindow(long l) | ||
| { | ||
|
|
| <groupId>org.apache.apex</groupId> | ||
| <artifactId>apex</artifactId> | ||
| <version>3.4.0</version> | ||
| <version>3.6.0</version> |
There was a problem hiding this comment.
Apache Apex dependency is already moved to 3.6, you can undo this change.
| } else { | ||
| pythonEnvPath = py4jDependencyFile.getAbsolutePath(); | ||
| } | ||
| LOG.debug("Final python environment path with Py4j depenency path: " + pythonEnvPath); |
There was a problem hiding this comment.
use {} in LOG statements
|
|
||
| import java.util.Map; | ||
|
|
||
| public interface PythonWorker<T> |
There was a problem hiding this comment.
please add java documentation for public interfaces / classes and methods
stream/pom.xml
Outdated
| <version>3.2.1</version> | ||
| </dependency> | ||
|
|
||
| <dependency> |
There was a problem hiding this comment.
remove these dependencies from stream package.
| */ | ||
| WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness); | ||
|
|
||
|
|
| Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName()); | ||
| Assert.assertTrue(1 == stream.getSinks().size()); | ||
| Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName()); | ||
| Assert.assertEquals("second", ((List<LogicalPlan.InputPortMeta>)(stream.getSinks())).get(0).getOperatorMeta().getName()); |
There was a problem hiding this comment.
is this part of your code change?
|
@patilvikram please make sure that jenkins / travis build passes. |
92a1f1b to
cf79a17
Compare
cf79a17 to
151413c
Compare
5336cd4 to
a043f96
Compare
| def __init__(self, trigger_type, count): | ||
| super(CountTrigger, self).__init__(trigger_type) | ||
| self.count = count | ||
|
|
There was a problem hiding this comment.
Remove Extra lines from bottom of this file
There was a problem hiding this comment.
Add documentation to code
python/apex-python/src/setup.py
Outdated
| setup(name='pyapex', | ||
| version='0.0.4', | ||
| py_modules=['pyapex','pyapex.runtime','pyapex.functions'], | ||
| ) |
| fi | ||
| if [ -f "$MVN_GENERATED_PATH" ]; then | ||
| # development launch mode | ||
| DT_CORE_JAR="$BUILD_DIR/malhar-python-3.8.0-SNAPSHOT.jar" |
There was a problem hiding this comment.
Make this versions dyanmically stamped
There was a problem hiding this comment.
Check how I can include current artifact in the classpath files like MVN_GENERATED_PATH
| #ln -sf $PYAPEX_HOME "$script_dir/pyapex" | ||
| if [ -z "$PYTHONPATH" ] | ||
| then | ||
| export PYTHONPATH="$PYAPEX_HOME/deps/pyapex-0.0.4-src.zip" |
There was a problem hiding this comment.
Add versions dynamically here
| { | ||
| private static final Logger LOG = LoggerFactory.getLogger(PythonGenericOperator.class); | ||
| protected byte[] serializedFunction = null; | ||
| private PythonServer server = null; |
There was a problem hiding this comment.
Make Python Server transient
| { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(PythonWindowedOperator.class); | ||
| private PythonServer server = null; |
There was a problem hiding this comment.
Make PythonServer server transient
| assert False, "Invalid Option" | ||
|
|
||
|
|
||
| if __name__ == "__main__": |
There was a problem hiding this comment.
Update this one in pyshell to call apexcli commands directly for kill / shutdown or any other updates
9a7f129 to
8d06aa2
Compare
|
Did this branch die? Has it been superceded by another? I got here from this ticket which was linked from the roadmap. |
@darrengarvey Right now its still at the dangling stage, the review is not yet completed. Let me know if you have any questions about this issue or PR. |
@PramodSSImmaneni @chinmaykolhatkar
I have created this REVIEW-ONLY PR for extending Python support for extending High Level APIs. Currently this is in early stage so I will add documentations which will help you setup environment to launch python app from your machine. Also I will include simple example in this PR.