@@ -14,6 +14,7 @@ import (
1414 "github.com/deckhouse/deckhouse/pkg/log"
1515 "github.com/hashicorp/go-multierror"
1616 "go.opentelemetry.io/otel"
17+ "go.opentelemetry.io/otel/attribute"
1718
1819 "github.com/flant/addon-operator/pkg/app"
1920 "github.com/flant/addon-operator/pkg/helm"
@@ -1169,13 +1170,13 @@ func (mm *ModuleManager) AreModulesInited() bool {
11691170}
11701171
11711172// RunModuleWithNewOpenAPISchema updates the module's OpenAPI schema from modulePath directory and pushes RunModuleTask if the module is enabled
1172- func (mm * ModuleManager ) RunModuleWithNewOpenAPISchema (moduleName , moduleSource , modulePath string ) error {
1173+ func (mm * ModuleManager ) RunModuleWithNewOpenAPISchema (moduleName string ) error {
11731174 currentModule := mm .modules .Get (moduleName )
11741175 if currentModule == nil {
11751176 return fmt .Errorf ("failed to get basic module - not found" )
11761177 }
11771178
1178- basicModule , err := mm .moduleLoader .LoadModule (moduleSource , modulePath )
1179+ basicModule , err := mm .moduleLoader .LoadModule (moduleName )
11791180 if err != nil {
11801181 return fmt .Errorf ("load module: %w" , err )
11811182 }
@@ -1193,216 +1194,66 @@ func (mm *ModuleManager) RunModuleWithNewOpenAPISchema(moduleName, moduleSource,
11931194}
11941195
11951196// RegisterModule checks if a module already exists and reapplies(reloads) its configuration.
1196- // If it's a new module - converges all modules - EXPERIMENTAL
1197- func (mm * ModuleManager ) RegisterModule (_ , _ string ) error {
1198- return fmt .Errorf ("not implemented yet" )
1199- }
1200-
1201- /*
1202- if !mm.modules.IsInited() {
1203- return moduleset.ErrNotInited
1204- }
1205-
1206- if mm.ModulesDir == "" {
1207- log.Warnf("Empty modules directory is passed! No modules to load.")
1208- return nil
1209- }
1210-
1211- if mm.moduleLoader == nil {
1212- log.Errorf("no module loader set")
1213- return fmt.Errorf("no module loader set")
1214- }
1215-
1216- // get basic module definition
1217- basicModule, err := mm.moduleLoader.LoadModule(moduleSource, modulePath)
1218-
1219- if err != nil {
1220- return fmt.Errorf("failed to get basic module's definition: %w", err)
1221- }
1222-
1223- moduleName := basicModule.GetName()
1224-
1225- // load and registry global hooks
1226-
1227- dep := &hooks.HookExecutionDependencyContainer{
1228- HookMetricsStorage: mm.dependencies.HookMetricStorage,
1229- KubeConfigManager: mm.dependencies.KubeConfigManager,
1230- KubeObjectPatcher: mm.dependencies.KubeObjectPatcher,
1231- MetricStorage: mm.dependencies.MetricStorage,
1232- GlobalValuesGetter: mm.global,
1233- }
1234-
1235- basicModule.WithDependencies(dep)
1236-
1237- // check if module already exists
1238-
1239- if mm.modules.Has(moduleName) {
1240- // if module is disabled in module manager
1241- if !mm.IsModuleEnabled(moduleName) {
1242- // update(upsert) module config in moduleset
1243- mm.modules.Add(basicModule)
1244- // get kube config for the module to check if it has enabled: true
1245- moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
1246- // if module isn't explicitly enabled in the module kube config - exit
1247- if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
1248- return nil
1249- }
1250- mm.AddEnabledModuleByConfigName(moduleName)
1251-
1252- // if the module kube config has enabled true, check enable script
1253- isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1254- if err != nil {
1255- return err
1256- }
1257-
1258- if isEnabled {
1259- mm.SendModuleEvent(events.ModuleEvent{
1260- ModuleName: moduleName,
1261- EventType: events.ModuleEnabled,
1262- })
1263- err := mm.UpdateModuleKubeConfig(moduleName)
1264- if err != nil {
1265- return err
1266- }
1267- log.Infof("Push ConvergeModules task because %q Module was re-enabled", moduleName)
1268- mm.PushConvergeModulesTask(moduleName, "re-enabled")
1269- }
1270- return nil
1271- }
1272- // module is enabled, disable its hooks
1273- mm.DisableModuleHooks(moduleName)
1274-
1275- module := mm.GetModule(moduleName)
1276- // check for nil to prevent operator from panicking
1277- if module == nil {
1278- return fmt.Errorf("couldn't get %s module configuration", moduleName)
1279- }
1280-
1281- // deregister modules' hooks
1282- module.DeregisterHooks()
1283-
1284- // upsert a new module in the moduleset
1285- mm.modules.Add(basicModule)
1286-
1287- // check if module is enabled via enabled scripts
1288- isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1289- if err != nil {
1290- return err
1291- }
1292-
1293- ev := events.ModuleEvent{
1294- ModuleName: moduleName,
1295- EventType: events.ModuleEnabled,
1296- }
1297-
1298- if isEnabled {
1299- // enqueue module startup sequence if it is enabled
1300- err := mm.PushRunModuleTask(moduleName, false)
1301- if err != nil {
1302- return err
1303- }
1304- } else {
1305- ev.EventType = events.ModuleDisabled
1306- mm.PushDeleteModuleTask(moduleName)
1307- // modules is disabled - update modulemanager's state
1308- mm.DeleteEnabledModuleName(moduleName)
1309- }
1310- mm.SendModuleEvent(ev)
1311- return nil
1312- }
1313-
1314- // module doesn't exist
1315- mm.modules.Add(basicModule)
1316-
1317- // a new module requires to be registered
1318-
1319- mm.SendModuleEvent(events.ModuleEvent{
1320- ModuleName: moduleName,
1321- EventType: events.ModuleRegistered,
1322- })
1323-
1324- // get kube config for the module to check if it has enabled: true
1325- moduleKubeConfigEnabled := mm.dependencies.KubeConfigManager.IsModuleEnabled(moduleName)
1326- // if module isn't explicitly enabled in the module kube config - exit
1327-
1328- if moduleKubeConfigEnabled == nil || (moduleKubeConfigEnabled != nil && !*moduleKubeConfigEnabled) {
1329- return nil
1330- }
1331-
1332- mm.AddEnabledModuleByConfigName(moduleName)
1333-
1334- // if the module kube config has enabled true, check enable script
1335- isEnabled, err := basicModule.RunEnabledScript(mm.TempDir, mm.GetEnabledModuleNames(), map[string]string{})
1336-
1337- if err != nil {
1338- return err
1339- }
1340-
1341- if isEnabled {
1342- err := mm.UpdateModuleKubeConfig(moduleName)
1343- if err != nil {
1344- return err
1345- }
1346- log.Infof("Push ConvergeModules task because %q Module was enabled", moduleName)
1347- mm.PushConvergeModulesTask(moduleName, "registered-and-enabled")
1348- mm.SendModuleEvent(events.ModuleEvent{
1349- ModuleName: moduleName,
1350- EventType: events.ModuleEnabled,
1351- })
1352- }
1353-
1354- return nil
1355- }
1356-
1357- // PushDeleteModule pushes moduleDelete task for a module into the main queue
1358- // TODO: EXPERIMENTAL
1359- /*func (mm *ModuleManager) PushDeleteModuleTask(moduleName string) {
1360- // check if there is already moduleDelete task in the main queue for the module
1361- if queueHasPendingModuleDeleteTask(mm.dependencies.TaskQueues.GetMain(), moduleName) {
1362- return
1197+ // If it's a new module - converges all modules
1198+ func (mm * ModuleManager ) RegisterModule (ctx context.Context , moduleName string ) error {
1199+ ctx , span := otel .Tracer (moduleManagerServiceName ).Start (ctx , "RegisterModule" )
1200+ defer span .End ()
1201+
1202+ span .SetAttributes (attribute .String ("module" , moduleName ))
1203+
1204+ mm .logger .Debug ("register the module" , slog .String ("module" , moduleName ))
1205+
1206+ if module := mm .GetModule (moduleName ); module != nil {
1207+ mm .logger .Debug ("unregister the module" , slog .String ("module" , moduleName ))
1208+
1209+ mm .DisableModuleHooks (moduleName )
1210+ mm .dependencies .HelmResourcesManager .StopMonitor (moduleName )
1211+ module .ResetState ()
1212+ module .DeregisterHooks ()
13631213 }
13641214
1365- newTask := sh_task.NewTask(task.ModuleDelete).
1366- WithQueueName("main").
1367- WithMetadata(task.HookMetadata{
1368- EventDescription: "ModuleManager-Delete-Module",
1369- ModuleName: moduleName,
1370- })
1371- newTask.SetProp("triggered-by", "ModuleManager")
1215+ module , err := mm .moduleLoader .LoadModule (moduleName )
1216+ if err != nil {
1217+ return fmt .Errorf ("load the module '%s': %w" , moduleName , err )
1218+ }
13721219
1373- mm.dependencies.TaskQueues.GetMain().AddLast(newTask.WithQueuedAt(time.Now()))
1220+ // load and registry global hooks
1221+ dep := & hooks.HookExecutionDependencyContainer {
1222+ HookMetricsStorage : mm .dependencies .HookMetricStorage ,
1223+ KubeConfigManager : mm .dependencies .KubeConfigManager ,
1224+ KubeObjectPatcher : mm .dependencies .KubeObjectPatcher ,
1225+ MetricStorage : mm .dependencies .MetricStorage ,
1226+ GlobalValuesGetter : mm .global ,
1227+ }
13741228
1375- log.Infof("Push ConvergeModules task because %q Module was disabled", moduleName)
1376- mm.PushConvergeModulesTask(moduleName, "disabled")
1377- }
1229+ module .WithDependencies (dep )
13781230
1379- // PushConvergeModulesTask pushes ConvergeModulesTask into the main queue to update all modules on a module enable/disable event
1380- // TODO: EXPERIMENTAL
1381- func (mm *ModuleManager) PushConvergeModulesTask(moduleName, moduleState string) {
1382- newConvergeTask := sh_task.NewTask(task.ConvergeModules).
1383- WithQueueName("main").
1384- WithMetadata(task.HookMetadata{
1385- EventDescription: fmt.Sprintf("ModuleManager-%s-Module", moduleState),
1386- ModuleName: moduleName,
1387- }).
1388- WithQueuedAt(time.Now())
1389- newConvergeTask.SetProp("triggered-by", "ModuleManager")
1390- newConvergeTask.SetProp(converge.ConvergeEventProp, converge.ReloadAllModules)
1231+ // add module to set
1232+ mm .modules .Add (module )
13911233
1392- mm.dependencies.TaskQueues.GetMain().AddLast(newConvergeTask.WithQueuedAt(time.Now()))
1393- }
1234+ // add module to scheduler
1235+ if err = mm .moduleScheduler .AddModuleVertex (module ); err != nil {
1236+ return fmt .Errorf ("add module vertex: %w" , err )
1237+ }
13941238
1395- // queueHasPendingModuleDeleteTask returns true if queue has pending tasks
1396- // with the type "ModuleDelete" related to the module "moduleName"
1397- // TODO: EXPERIMENTAL
1398- func queueHasPendingModuleDeleteTask(q *queue.TaskQueue, moduleName string) bool {
1399- if q == nil {
1400- return false
1239+ // reinit scheduler
1240+ if err = mm .moduleScheduler .Initialize (); err != nil {
1241+ return fmt .Errorf ("initialize module scheduler: %w" , err )
14011242 }
1402- modules := modulesWithPendingTasks(q, task.ModuleDelete)
1403- meta, has := modules[moduleName]
1404- return has && meta.doStartup
1405- } */
1243+
1244+ mm .SendModuleEvent (events.ModuleEvent {
1245+ ModuleName : module .Name ,
1246+ EventType : events .ModuleRegistered ,
1247+ })
1248+
1249+ // send event to trigger reconverge
1250+ mm .SchedulerEventCh () <- extenders.ExtenderEvent {
1251+ ExtenderName : dynamic_extender .Name ,
1252+ EncapsulatedEvent : dynamic_extender.DynamicExtenderEvent {},
1253+ }
1254+
1255+ return nil
1256+ }
14061257
14071258// registerModules load all available modules from modules directory.
14081259func (mm * ModuleManager ) registerModules (scriptEnabledExtender * script_extender.Extender ) error {
0 commit comments