1919package com .dtstack .flink .sql .launcher ;
2020
2121import avro .shaded .com .google .common .collect .Lists ;
22+ import com .dtstack .flink .sql .util .PluginUtil ;
2223import org .apache .commons .cli .BasicParser ;
2324import org .apache .commons .cli .CommandLine ;
2425import org .apache .commons .cli .Options ;
2526import org .apache .commons .lang .StringUtils ;
2627import org .apache .flink .hadoop .shaded .com .google .common .base .Charsets ;
2728import org .apache .flink .hadoop .shaded .com .google .common .base .Preconditions ;
28-
2929import java .io .File ;
3030import java .io .FileInputStream ;
3131import java .net .URLEncoder ;
3232import java .util .List ;
3333import java .util .Map ;
34- import java .util .Properties ;
35-
36- import static com .dtstack .flink .sql .launcher .LauncherOptions .*;
37- import static com .dtstack .flink .sql .launcher .ClusterMode .*;
38-
34+ import com .dtstack .flink .sql .ClusterMode ;
3935
4036/**
4137 * The Parser of Launcher commandline options
4541 */
4642public class LauncherOptionParser {
4743
44+ public static final String OPTION_MODE = "mode" ;
45+
46+ public static final String OPTION_NAME = "name" ;
47+
48+ public static final String OPTION_SQL = "sql" ;
49+
50+ public static final String OPTION_FLINK_CONF_DIR = "flinkconf" ;
51+
52+ public static final String OPTION_YARN_CONF_DIR = "yarnconf" ;
53+
54+ public static final String OPTION_LOCAL_SQL_PLUGIN_PATH = "localSqlPluginPath" ;
55+
56+ public static final String OPTION_REMOTE_SQL_PLUGIN_PATH = "remoteSqlPluginPath" ;
57+
58+ public static final String OPTION_ADDJAR = "addjar" ;
59+
60+ public static final String OPTION_CONF_PROP = "confProp" ;
61+
62+ public static final String OPTION_SAVE_POINT_PATH = "savePointPath" ;
63+
64+ public static final String OPTION_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState" ;
65+
4866 private Options options = new Options ();
4967
5068 private BasicParser parser = new BasicParser ();
5169
52- private Properties properties = new Properties ();
70+ private LauncherOptions properties = new LauncherOptions ();
5371
5472 public LauncherOptionParser (String [] args ) {
55- options .addOption (LauncherOptions . OPTION_MODE , true , "Running mode" );
73+ options .addOption (OPTION_MODE , true , "Running mode" );
5674 options .addOption (OPTION_SQL , true , "Job sql file" );
5775 options .addOption (OPTION_NAME , true , "Job name" );
5876 options .addOption (OPTION_FLINK_CONF_DIR , true , "Flink configuration directory" );
@@ -62,11 +80,14 @@ public LauncherOptionParser(String[] args) {
6280 options .addOption (OPTION_CONF_PROP , true , "sql ref prop,eg specify event time" );
6381 options .addOption (OPTION_YARN_CONF_DIR , true , "Yarn and hadoop configuration directory" );
6482
83+ options .addOption (OPTION_SAVE_POINT_PATH , true , "Savepoint restore path" );
84+ options .addOption (OPTION_ALLOW_NON_RESTORED_STATE , true , "Flag indicating whether non restored state is allowed if the savepoint" );
85+
6586 try {
6687 CommandLine cl = parser .parse (options , args );
67- String mode = cl .getOptionValue (OPTION_MODE , MODE_LOCAL );
88+ String mode = cl .getOptionValue (OPTION_MODE , ClusterMode . local . name () );
6889 //check mode
69- properties .put ( OPTION_MODE , mode );
90+ properties .setMode ( mode );
7091
7192 String job = Preconditions .checkNotNull (cl .getOptionValue (OPTION_SQL ),
7293 "Must specify job file using option '" + OPTION_SQL + "'" );
@@ -76,78 +97,65 @@ public LauncherOptionParser(String[] args) {
7697 in .read (filecontent );
7798 String content = new String (filecontent , "UTF-8" );
7899 String sql = URLEncoder .encode (content , Charsets .UTF_8 .name ());
79- properties .put (OPTION_SQL , sql );
80-
100+ properties .setSql (sql );
81101 String localPlugin = Preconditions .checkNotNull (cl .getOptionValue (OPTION_LOCAL_SQL_PLUGIN_PATH ));
82- properties .put (OPTION_LOCAL_SQL_PLUGIN_PATH , localPlugin );
83-
102+ properties .setLocalSqlPluginPath (localPlugin );
84103 String remotePlugin = cl .getOptionValue (OPTION_REMOTE_SQL_PLUGIN_PATH );
85- if (!mode . equalsIgnoreCase ( ClusterMode . MODE_LOCAL )){
104+ if (!ClusterMode . local . name (). equals ( mode )){
86105 Preconditions .checkNotNull (remotePlugin );
87- properties .put ( OPTION_REMOTE_SQL_PLUGIN_PATH , remotePlugin );
106+ properties .setRemoteSqlPluginPath ( remotePlugin );
88107 }
89-
90108 String name = Preconditions .checkNotNull (cl .getOptionValue (OPTION_NAME ));
91- properties .put (OPTION_NAME , name );
92-
109+ properties .setName (name );
93110 String addJar = cl .getOptionValue (OPTION_ADDJAR );
94111 if (StringUtils .isNotBlank (addJar )){
95- properties .put ( OPTION_ADDJAR , addJar );
112+ properties .setAddjar ( addJar );
96113 }
97-
98114 String confProp = cl .getOptionValue (OPTION_CONF_PROP );
99115 if (StringUtils .isNotBlank (confProp )){
100- properties .put ( OPTION_CONF_PROP , confProp );
116+ properties .setConfProp ( confProp );
101117 }
102-
103118 String flinkConfDir = cl .getOptionValue (OPTION_FLINK_CONF_DIR );
104119 if (StringUtils .isNotBlank (flinkConfDir )) {
105- properties .put ( OPTION_FLINK_CONF_DIR , flinkConfDir );
120+ properties .setFlinkconf ( flinkConfDir );
106121 }
107122
108123 String yarnConfDir = cl .getOptionValue (OPTION_YARN_CONF_DIR );
109124 if (StringUtils .isNotBlank (yarnConfDir )) {
110- properties .put (OPTION_YARN_CONF_DIR , yarnConfDir );
125+ properties .setYarnconf (yarnConfDir );
126+ }
127+
128+ String savePointPath = cl .getOptionValue (OPTION_SAVE_POINT_PATH );
129+ if (StringUtils .isNotBlank (savePointPath )) {
130+ properties .setSavePointPath (savePointPath );
131+ }
132+
133+ String allow_non = cl .getOptionValue (OPTION_ALLOW_NON_RESTORED_STATE );
134+ if (StringUtils .isNotBlank (allow_non )) {
135+ properties .setAllowNonRestoredState (allow_non );
111136 }
112137
113138 } catch (Exception e ) {
114139 throw new RuntimeException (e );
115140 }
116-
117141 }
118142
119- public Properties getProperties (){
143+ public LauncherOptions getLauncherOptions (){
120144 return properties ;
121145 }
122146
123- public Object getVal (String key ){
124- return properties .get (key );
125- }
126-
127- public List <String > getAllArgList (){
147+ public List <String > getProgramExeArgList () throws Exception {
148+ Map <String ,Object > mapConf = PluginUtil .ObjectToMap (properties );
128149 List <String > args = Lists .newArrayList ();
129- for (Map .Entry <Object , Object > one : properties .entrySet ()){
130- args .add ("-" + one .getKey ().toString ());
131- args .add (one .getValue ().toString ());
132- }
133-
134- return args ;
135- }
136-
137- public List <String > getProgramExeArgList (){
138- List <String > args = Lists .newArrayList ();
139- for (Map .Entry <Object , Object > one : properties .entrySet ()){
140- String key = one .getKey ().toString ();
150+ for (Map .Entry <String , Object > one : mapConf .entrySet ()){
151+ String key = one .getKey ();
141152 if (OPTION_FLINK_CONF_DIR .equalsIgnoreCase (key )
142153 || OPTION_YARN_CONF_DIR .equalsIgnoreCase (key )){
143154 continue ;
144155 }
145-
146156 args .add ("-" + key );
147157 args .add (one .getValue ().toString ());
148158 }
149-
150159 return args ;
151160 }
152-
153161}
0 commit comments