Skip to content

Commit eae2d2e

Browse files
authored
[BUGFIX] Fix experiment engine plugins in routers with the traffic rules (#168)
* - fix concurrency issue, when RPC plugins are used in the router deployments with defined traffic rules * - fix concurrency issue, when RPC plugins are used in the router deployments with defined traffic rules
1 parent e108820 commit eae2d2e

File tree

12 files changed

+167
-58
lines changed

12 files changed

+167
-58
lines changed

engines/experiment/config.go renamed to engines/experiment/config/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
package experiment
1+
package config
2+
3+
import "encoding/json"
24

35
// EngineConfig is a struct used to decode engine's configuration into
46
// It consists of an optional PluginBinary (if the experiment engine is implemented
@@ -9,3 +11,11 @@ type EngineConfig struct {
911
PluginURL string `mapstructure:"plugin_url"`
1012
EngineConfiguration map[string]interface{} `mapstructure:",remain"`
1113
}
14+
15+
func (c EngineConfig) IsPlugin() bool {
16+
return c.PluginBinary != "" || c.PluginURL != ""
17+
}
18+
19+
func (c EngineConfig) RawEngineConfig() (json.RawMessage, error) {
20+
return json.Marshal(c.EngineConfiguration)
21+
}

engines/experiment/config_test.go renamed to engines/experiment/config/config_test.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
1-
package experiment_test
1+
package config_test
22

33
import (
44
"testing"
55

6-
"github.com/gojek/turing/engines/experiment"
6+
"github.com/gojek/turing/engines/experiment/config"
7+
78
"github.com/mitchellh/mapstructure"
89
"github.com/stretchr/testify/assert"
910
)
1011

1112
func Test_EngineConfigDecoding(t *testing.T) {
1213
var suite = map[string]struct {
1314
cfg interface{}
14-
expected experiment.EngineConfig
15+
expected config.EngineConfig
1516
err string
1617
}{
1718
"success | plugin": {
1819
cfg: map[string]interface{}{
1920
"plugin_binary": "path/to/binary",
2021
},
21-
expected: experiment.EngineConfig{
22+
expected: config.EngineConfig{
2223
PluginBinary: "path/to/binary",
2324
EngineConfiguration: nil,
2425
},
@@ -31,7 +32,7 @@ func Test_EngineConfigDecoding(t *testing.T) {
3132
"Key2-1": "Value2-1",
3233
},
3334
},
34-
expected: experiment.EngineConfig{
35+
expected: config.EngineConfig{
3536
PluginBinary: "path/to/binary",
3637
EngineConfiguration: map[string]interface{}{
3738
"Key1": "Value1",
@@ -45,7 +46,7 @@ func Test_EngineConfigDecoding(t *testing.T) {
4546
cfg: map[string]interface{}{
4647
"Key1": "Value1",
4748
},
48-
expected: experiment.EngineConfig{
49+
expected: config.EngineConfig{
4950
EngineConfiguration: map[string]interface{}{
5051
"Key1": "Value1",
5152
},
@@ -64,7 +65,7 @@ func Test_EngineConfigDecoding(t *testing.T) {
6465

6566
for name, tt := range suite {
6667
t.Run(name, func(t *testing.T) {
67-
var actual experiment.EngineConfig
68+
var actual config.EngineConfig
6869
err := mapstructure.Decode(tt.cfg, &actual)
6970

7071
if tt.err != "" {
@@ -76,3 +77,41 @@ func Test_EngineConfigDecoding(t *testing.T) {
7677
})
7778
}
7879
}
80+
81+
func TestEngineConfig_IsPlugin(t *testing.T) {
82+
var suite = map[string]struct {
83+
cfg config.EngineConfig
84+
expected bool
85+
}{
86+
"success | binary path provided": {
87+
cfg: config.EngineConfig{
88+
PluginBinary: "/app/plugins/my_plugin",
89+
},
90+
expected: true,
91+
},
92+
"success | download url provided": {
93+
cfg: config.EngineConfig{
94+
PluginURL: "http://localhost:8080/plugins/my_plugin",
95+
},
96+
expected: true,
97+
},
98+
"success | binary path and download url provided": {
99+
cfg: config.EngineConfig{
100+
PluginBinary: "/app/plugins/my_plugin",
101+
PluginURL: "http://localhost:8080/plugins/my_plugin",
102+
},
103+
expected: true,
104+
},
105+
"success | not provided": {
106+
cfg: config.EngineConfig{},
107+
expected: false,
108+
},
109+
}
110+
111+
for name, tt := range suite {
112+
t.Run(name, func(t *testing.T) {
113+
actual := tt.cfg.IsPlugin()
114+
assert.Equal(t, tt.expected, actual)
115+
})
116+
}
117+
}

engines/experiment/factory.go

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
package experiment
22

33
import (
4-
"encoding/json"
5-
"fmt"
6-
"net/url"
7-
"path"
8-
9-
"github.com/gojek/turing/engines/experiment/pkg/utils"
10-
4+
"github.com/gojek/turing/engines/experiment/config"
115
"github.com/gojek/turing/engines/experiment/manager"
126
"github.com/gojek/turing/engines/experiment/plugin/inproc"
137
"github.com/gojek/turing/engines/experiment/plugin/rpc"
@@ -29,36 +23,16 @@ type EngineFactory interface {
2923
// - experiment/plugin/rpc/factory (for experiment engines implemented as external net/rpc plugins)
3024
// The actual implementation is determined based on provided engine configuration (passed via `cfg`)
3125
func NewEngineFactory(name string, cfg map[string]interface{}, logger *zap.SugaredLogger) (EngineFactory, error) {
32-
var engineCfg EngineConfig
26+
var engineCfg config.EngineConfig
3327
if err := mapstructure.Decode(cfg, &engineCfg); err != nil {
3428
return nil, err
3529
}
3630

37-
engineCfgJSON, err := json.Marshal(engineCfg.EngineConfiguration)
38-
if err != nil {
39-
return nil, err
40-
}
41-
4231
// plugin-based implementation of the experiment engine factory
43-
if engineCfg.PluginBinary != "" {
44-
return rpc.NewFactory(engineCfg.PluginBinary, engineCfgJSON, logger)
45-
}
46-
47-
if engineCfg.PluginURL != "" {
48-
downloadURL, err := url.Parse(engineCfg.PluginURL)
49-
if err != nil {
50-
return nil, fmt.Errorf("failed to parse plugin URL: %v", err)
51-
}
52-
53-
filename := fmt.Sprintf("./%s", path.Base(downloadURL.Path))
54-
err = utils.DownloadFile(downloadURL, filename, 0744)
55-
if err != nil {
56-
return nil, fmt.Errorf(
57-
"failed to download plugin's binary from remote url: url=%s, %v", engineCfg.PluginURL, err)
58-
}
59-
return rpc.NewFactory(filename, engineCfgJSON, logger)
32+
if engineCfg.IsPlugin() {
33+
return rpc.NewFactory(name, engineCfg, logger)
6034
}
6135

6236
// compile-time implementation of the experiment engine factory
63-
return inproc.NewEngineFactory(name, engineCfgJSON)
37+
return inproc.NewEngineFactory(name, engineCfg)
6438
}

engines/experiment/factory_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"fmt"
77
"testing"
88

9+
"github.com/gojek/turing/engines/experiment/config"
10+
911
"bou.ke/monkey"
1012
"github.com/gojek/turing/engines/experiment"
1113
"github.com/gojek/turing/engines/experiment/plugin/inproc"
@@ -41,7 +43,7 @@ func Test_NewEngineFactory(t *testing.T) {
4143
},
4244
withPatch: func(expected experiment.EngineFactory, fn func()) {
4345
monkey.Patch(rpc.NewFactory,
44-
func(string, json.RawMessage, *zap.SugaredLogger) (*rpc.EngineFactory, error) {
46+
func(string, config.EngineConfig, *zap.SugaredLogger) (*rpc.EngineFactory, error) {
4547
return expected.(*rpc.EngineFactory), nil
4648
},
4749
)

engines/experiment/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/hashicorp/go-hclog v0.16.0
1111
github.com/hashicorp/go-plugin v1.4.3
1212
github.com/leodido/go-urn v1.2.1 // indirect
13+
github.com/mitchellh/hashstructure/v2 v2.0.2
1314
github.com/mitchellh/mapstructure v1.4.3
1415
github.com/pkg/errors v0.9.1
1516
github.com/stretchr/testify v1.7.0

engines/experiment/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW1
5959
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
6060
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 h1:7GoSOOW2jpsfkntVKaS2rAr1TJqfcxotyaUcuxoZSzg=
6161
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
62+
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
63+
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
6264
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
6365
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
6466
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=

engines/experiment/plugin/inproc/factory.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package inproc
33
import (
44
"encoding/json"
55

6+
"github.com/gojek/turing/engines/experiment/config"
7+
68
"github.com/gojek/turing/engines/experiment/manager"
79
managerPlugin "github.com/gojek/turing/engines/experiment/plugin/inproc/manager"
810
runnerPlugin "github.com/gojek/turing/engines/experiment/plugin/inproc/runner"
@@ -24,9 +26,13 @@ func (f *EngineFactory) GetExperimentRunner() (runner.ExperimentRunner, error) {
2426
return runnerPlugin.Get(f.EngineName, f.EngineConfig)
2527
}
2628

27-
func NewEngineFactory(name string, cfg json.RawMessage) (*EngineFactory, error) {
29+
func NewEngineFactory(name string, cfg config.EngineConfig) (*EngineFactory, error) {
30+
engineCfg, err := cfg.RawEngineConfig()
31+
if err != nil {
32+
return nil, err
33+
}
2834
return &EngineFactory{
2935
EngineName: name,
30-
EngineConfig: cfg,
36+
EngineConfig: engineCfg,
3137
}, nil
3238
}

engines/experiment/plugin/inproc/factory_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"errors"
66
"testing"
77

8+
"github.com/gojek/turing/engines/experiment/config"
9+
810
"bou.ke/monkey"
911
"github.com/gojek/turing/engines/experiment/manager"
1012
mocksManager "github.com/gojek/turing/engines/experiment/manager/mocks"
@@ -19,19 +21,25 @@ import (
1921
func Test_NewEngineFactory(t *testing.T) {
2022
suite := map[string]struct {
2123
engine string
22-
cfg json.RawMessage
24+
cfg config.EngineConfig
2325
}{
2426
"success": {
2527
engine: "engine-1",
26-
cfg: json.RawMessage("{\"my_config\": \"my_value\"}"),
28+
cfg: config.EngineConfig{
29+
EngineConfiguration: map[string]interface{}{
30+
"my_config": "my_value",
31+
},
32+
},
2733
},
2834
}
2935
for name, tt := range suite {
3036
t.Run(name, func(t *testing.T) {
3137
factory, err := plugin.NewEngineFactory(tt.engine, tt.cfg)
3238
assert.NoError(t, err)
3339
assert.Equal(t, tt.engine, factory.EngineName)
34-
assert.Equal(t, tt.cfg, factory.EngineConfig)
40+
41+
rawConfig, _ := tt.cfg.RawEngineConfig()
42+
assert.Equal(t, rawConfig, factory.EngineConfig)
3543
})
3644
}
3745
}
@@ -67,7 +75,7 @@ func TestEngineFactory_GetExperimentManager(t *testing.T) {
6775

6876
for name, tt := range suite {
6977
t.Run(name, func(t *testing.T) {
70-
factory, _ := plugin.NewEngineFactory(tt.engine, nil)
78+
factory, _ := plugin.NewEngineFactory(tt.engine, config.EngineConfig{})
7179
withPatchedManagerRegistry(tt.expected, tt.err, func() {
7280
actual, err := factory.GetExperimentManager()
7381
if tt.err != "" {
@@ -112,7 +120,7 @@ func TestEngineFactory_GetExperimentRunner(t *testing.T) {
112120

113121
for name, tt := range suite {
114122
t.Run(name, func(t *testing.T) {
115-
factory, _ := plugin.NewEngineFactory(tt.engine, nil)
123+
factory, _ := plugin.NewEngineFactory(tt.engine, config.EngineConfig{})
116124
withPatchedRunnerRegistry(tt.expected, tt.err, func() {
117125
actual, err := factory.GetExperimentRunner()
118126
if tt.err != "" {

engines/experiment/plugin/rpc/factory.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,24 @@ package rpc
33
import (
44
"encoding/json"
55
"fmt"
6+
"net/url"
7+
"path"
68
"reflect"
79
"sync"
810

11+
"github.com/gojek/turing/engines/experiment/config"
912
"github.com/gojek/turing/engines/experiment/manager"
13+
"github.com/gojek/turing/engines/experiment/pkg/utils"
1014
"github.com/gojek/turing/engines/experiment/plugin/rpc/shared"
1115
"github.com/gojek/turing/engines/experiment/runner"
1216
"github.com/hashicorp/go-plugin"
17+
"github.com/mitchellh/hashstructure/v2"
1318
"go.uber.org/zap"
1419
)
1520

21+
var factoriesmu sync.Mutex
22+
var factories = make(map[string]*EngineFactory)
23+
1624
// EngineFactory implements experiment.EngineFactory and creates experiment manager/runner
1725
// backed by net/rpc plugin implementations
1826
type EngineFactory struct {
@@ -74,7 +82,44 @@ func (f *EngineFactory) GetExperimentRunner() (runner.ExperimentRunner, error) {
7482
return f.runner, nil
7583
}
7684

77-
func NewFactory(pluginBinary string, engineCfg json.RawMessage, logger *zap.SugaredLogger) (*EngineFactory, error) {
85+
func NewFactory(name string, cfg config.EngineConfig, logger *zap.SugaredLogger) (*EngineFactory, error) {
86+
factoriesmu.Lock()
87+
defer factoriesmu.Unlock()
88+
89+
// get a hash of the engine's configuration and use it as a configuration's fingerprint
90+
cfgHash, err := hashstructure.Hash(cfg, hashstructure.FormatV2, nil)
91+
if err != nil {
92+
return nil, err
93+
}
94+
factoryKey := fmt.Sprintf("%s-%d", name, cfgHash)
95+
96+
if engineFactory, ok := factories[factoryKey]; ok {
97+
return engineFactory, nil
98+
}
99+
100+
engineCfg, err := cfg.RawEngineConfig()
101+
if err != nil {
102+
return nil, err
103+
}
104+
if cfg.PluginBinary != "" {
105+
factories[factoryKey], err = NewFactoryFromBinary(cfg.PluginBinary, engineCfg, logger)
106+
} else if cfg.PluginURL != "" {
107+
factories[factoryKey], err = NewFactoryFromURL(cfg.PluginURL, engineCfg, logger)
108+
} else {
109+
err = fmt.Errorf("either `plugin_url` or `plugin_binary` must be specified")
110+
}
111+
112+
if err != nil {
113+
return nil, err
114+
}
115+
return factories[factoryKey], nil
116+
}
117+
118+
func NewFactoryFromBinary(
119+
pluginBinary string,
120+
engineCfg json.RawMessage,
121+
logger *zap.SugaredLogger,
122+
) (*EngineFactory, error) {
78123
rpcClient, err := Connect(pluginBinary, logger.Desugar())
79124
if err != nil {
80125
return nil, err
@@ -85,3 +130,22 @@ func NewFactory(pluginBinary string, engineCfg json.RawMessage, logger *zap.Suga
85130
EngineConfig: engineCfg,
86131
}, nil
87132
}
133+
134+
func NewFactoryFromURL(
135+
pluginURL string,
136+
engineCfg json.RawMessage,
137+
logger *zap.SugaredLogger,
138+
) (*EngineFactory, error) {
139+
downloadURL, err := url.Parse(pluginURL)
140+
if err != nil {
141+
return nil, fmt.Errorf("failed to parse plugin URL: %v", err)
142+
}
143+
144+
filename := fmt.Sprintf("./%s", path.Base(downloadURL.Path))
145+
err = utils.DownloadFile(downloadURL, filename, 0744)
146+
if err != nil {
147+
return nil, fmt.Errorf(
148+
"failed to download plugin's binary from remote url: url=%s, %v", pluginURL, err)
149+
}
150+
return NewFactoryFromBinary(filename, engineCfg, logger)
151+
}

0 commit comments

Comments
 (0)