Skip to content

Commit b35c83f

Browse files
committed
fix mysql async side fill data exception(IndexOutOfBoundsException)
1 parent 81b9f4b commit b35c83f

File tree

3 files changed

+10
-1
lines changed

3 files changed

+10
-1
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ public static void main(String[] args) throws Exception {
105105
options.addOption("confProp", true, "env properties");
106106
options.addOption("mode", true, "deploy mode");
107107

108+
options.addOption("savePointPath", true, "Savepoint restore path");
109+
options.addOption("allowNonRestoredState", true, "Flag indicating whether non restored state is allowed if the savepoint");
110+
108111
CommandLineParser parser = new DefaultParser();
109112
CommandLine cl = parser.parse(options, args);
110113
String sql = cl.getOptionValue("sql");

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public LauncherOptionParser(String[] args) {
105105
Preconditions.checkNotNull(remotePlugin);
106106
properties.setRemoteSqlPluginPath(remotePlugin);
107107
}
108+
108109
String name = Preconditions.checkNotNull(cl.getOptionValue(OPTION_NAME));
109110
properties.setName(name);
110111
String addJar = cl.getOptionValue(OPTION_ADDJAR);
@@ -153,6 +154,11 @@ public List<String> getProgramExeArgList() throws Exception {
153154
|| OPTION_YARN_CONF_DIR.equalsIgnoreCase(key)){
154155
continue;
155156
}
157+
158+
if(one.getValue() == null){
159+
continue;
160+
}
161+
156162
args.add("-" + key);
157163
args.add(one.getValue().toString());
158164
}

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public Row fillData(Row input, Object line){
191191
row.setField(entry.getKey(), obj);
192192
}
193193

194-
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
194+
for(Map.Entry<Integer, Integer> entry : sideInfo.getSideFieldIndex().entrySet()){
195195
if(jsonArray == null){
196196
row.setField(entry.getKey(), null);
197197
}else{

0 commit comments

Comments
 (0)