diff --git a/hooks/ordrd-group-x/main.go b/hooks/ordrd-group-x/main.go index a58a0a1..03c12b3 100644 --- a/hooks/ordrd-group-x/main.go +++ b/hooks/ordrd-group-x/main.go @@ -22,6 +22,12 @@ var ( // baseGroup is obtained from a flag and used for the shared posixGroup. baseGroup string + + // userObjectClasses is the objectClass list written to user entries in the + // target LDAP. Configurable via --userObjectClasses so the hook can run + // against targets that don't have the helxUser schema loaded. + userObjectClassesFlag string + userObjectClasses []string ) // HookRequest represents the input payload for the /hook endpoint. @@ -180,15 +186,19 @@ func processORDRDGroup(req HookRequest) HookResponse { // Emit one extra transformed entry per member that patches their groups attribute. // Because groups is a merge attribute in the main service, these accumulate correctly // (e.g. a user in both "users" and "eagle" ends up with groups: [users, eagle]). + // Only emitted when helxUser is in the objectClass list; otherwise the destination + // LDAP schema won't have the groups attribute type defined. transformedEntries := []map[string]interface{}{transformed} - for _, uid := range memberUids { - userGroupPatch := map[string]interface{}{ - "dn": fmt.Sprintf("uid=%s,ou=users,dc=example,dc=org", uid), - "content": map[string]interface{}{ - "groups": []interface{}{groupname}, - }, + if hasHelxUser() { + for _, uid := range memberUids { + userGroupPatch := map[string]interface{}{ + "dn": fmt.Sprintf("uid=%s,ou=users,dc=example,dc=org", uid), + "content": map[string]interface{}{ + "groups": []interface{}{groupname}, + }, + } + transformedEntries = append(transformedEntries, userGroupPatch) } - transformedEntries = append(transformedEntries, userGroupPatch) } return HookResponse{ @@ -219,14 +229,17 @@ func processUNCUser(req HookRequest) HookResponse { "displayName": req.Content["displayName"], "gidNumber": baseGid, // Use the global baseGid. "givenName": req.Content["givenName"], - "groups": []interface{}{baseGroup}, // every user belongs to the base group "homeDirectory": fmt.Sprintf("/home/%s", uid), - "objectClass": []string{"top", "inetOrgPerson", "posixAccount", "helxUser"}, + "objectClass": userObjectClasses, "ou": "users", "sn": req.Content["sn"], "uid": uid, "uidNumber": req.Content["uidNumber"], } + // Only include groups when the destination schema has helxUser loaded. + if hasHelxUser() { + newContent["groups"] = []interface{}{baseGroup} + } transformed := map[string]interface{}{ "dn": newDN, @@ -303,6 +316,18 @@ func processPosixGroup(req HookRequest) HookResponse { } } +// hasHelxUser reports whether "helxUser" is present in the configured +// userObjectClasses. The groups attribute is only defined in the helxUser +// schema extension, so it must not be written to destinations that lack it. +func hasHelxUser() bool { + for _, oc := range userObjectClasses { + if oc == "helxUser" { + return true + } + } + return false +} + // extractGroupName extracts the groupname from a DN expected in the form: // "cn=unc:app:renci:{{ groupname }},ou=Groups,dc=unc,dc=edu" func extractGroupName(dn string) string { @@ -342,8 +367,17 @@ func main() { // Accept the baseGid flag. Default value is "200" (adjust as needed). flag.StringVar(&baseGid, "baseGid", "200", "Base gidNumber to use for UNC Users") flag.StringVar(&baseGroup, "baseGroup", "users", "Base group name for all UNC Users") + flag.StringVar(&userObjectClassesFlag, "userObjectClasses", + "top,inetOrgPerson,posixAccount,helxUser", + "Comma-separated objectClass list to assign to synced user entries") flag.Parse() + for _, oc := range strings.Split(userObjectClassesFlag, ",") { + if oc = strings.TrimSpace(oc); oc != "" { + userObjectClasses = append(userObjectClasses, oc) + } + } + e := echo.New() // Register the /hook POST endpoint. diff --git a/hooks/unc-group-x/main.go b/hooks/unc-group-x/main.go index 54ab9e2..d93e4ec 100644 --- a/hooks/unc-group-x/main.go +++ b/hooks/unc-group-x/main.go @@ -201,15 +201,19 @@ func processORDRDGroup(req HookRequest) HookResponse { // Emit one extra transformed entry per member that patches their groups attribute. // Because groups is a merge attribute in the main service, these accumulate correctly // (e.g. a user in both "users" and "eagle" ends up with groups: [users, eagle]). + // Only emitted when helxUser is in the objectClass list; otherwise the destination + // LDAP schema won't have the groups attribute type defined. transformedEntries := []map[string]interface{}{transformed} - for _, pid := range memberPids { - userGroupPatch := map[string]interface{}{ - "dn": fmt.Sprintf("uid=$pidUidMap.%s,ou=users,dc=example,dc=org", pid), - "content": map[string]interface{}{ - "groups": []interface{}{groupname}, - }, + if hasHelxUser() { + for _, pid := range memberPids { + userGroupPatch := map[string]interface{}{ + "dn": fmt.Sprintf("uid=$pidUidMap.%s,ou=users,dc=example,dc=org", pid), + "content": map[string]interface{}{ + "groups": []interface{}{groupname}, + }, + } + transformedEntries = append(transformedEntries, userGroupPatch) } - transformedEntries = append(transformedEntries, userGroupPatch) } return HookResponse{ @@ -254,17 +258,20 @@ func processUNCUser(req HookRequest) HookResponse { // Build the transformed content. newContent := map[string]interface{}{ - "cn": req.Content["cn"], - "displayName": req.Content["displayName"], - "gidNumber": baseGid, // Use the global baseGid. - "givenName": req.Content["givenName"], - "groups": []interface{}{baseGroup}, // every user belongs to the base group - "homeDirectory": fmt.Sprintf("/home/%s", uid), - "objectClass": userObjectClasses, - "ou": "users", - "sn": req.Content["sn"], - "uid": uid, - "uidNumber": req.Content["uidNumber"], + "cn": req.Content["cn"], + "displayName": req.Content["displayName"], + "gidNumber": baseGid, // Use the global baseGid. + "givenName": req.Content["givenName"], + "homeDirectory": fmt.Sprintf("/home/%s", uid), + "objectClass": userObjectClasses, + "ou": "users", + "sn": req.Content["sn"], + "uid": uid, + "uidNumber": req.Content["uidNumber"], + } + // Only include groups when the destination schema has helxUser loaded. + if hasHelxUser() { + newContent["groups"] = []interface{}{baseGroup} } transformed := map[string]interface{}{ @@ -397,6 +404,18 @@ func extractCN(dn string) string { return "" } +// hasHelxUser reports whether "helxUser" is present in the configured +// userObjectClasses. The groups attribute is only defined in the helxUser +// schema extension, so it must not be written to destinations that lack it. +func hasHelxUser() bool { + for _, oc := range userObjectClasses { + if oc == "helxUser" { + return true + } + } + return false +} + // copyMap creates a shallow copy of a map. func copyMap(orig map[string]interface{}) map[string]interface{} { newMap := make(map[string]interface{}) diff --git a/main.go b/main.go index 5054e34..e62c352 100644 --- a/main.go +++ b/main.go @@ -30,10 +30,11 @@ import ( // LDAPConfig holds connection details for one LDAP server. type LDAPConfig struct { - URL string `yaml:"url"` - BindDN string `yaml:"bind_dn"` - BindPassword string `yaml:"bind_password"` - BaseDN string `yaml:"base_dn"` + URL string `yaml:"url"` + BindDN string `yaml:"bind_dn"` + BindPassword string `yaml:"bind_password"` + BaseDN string `yaml:"base_dn"` + ExcludeAttributes []string `yaml:"exclude_attributes"` } // DatabaseConfig holds database connection details. @@ -54,6 +55,26 @@ type HookRetryConfig struct { MaxDelayMs int `yaml:"max_delay_ms"` } +// PluginRetryConfig holds retry configuration for plugin Apply calls. +type PluginRetryConfig struct { + MaxAttempts int `yaml:"max_attempts"` + InitialDelayMs int `yaml:"initial_delay_ms"` + MaxDelayMs int `yaml:"max_delay_ms"` +} + +// PluginConfig describes one configured plugin instance. +type PluginConfig struct { + Name string `yaml:"name"` + Enabled bool `yaml:"enabled"` + Options map[string]interface{} `yaml:"options"` +} + +// PluginsConfig groups all plugin configuration. +type PluginsConfig struct { + Retry PluginRetryConfig `yaml:"retry"` + Enabled []PluginConfig `yaml:"enabled"` +} + // Config holds the configuration for both source and target LDAP servers. type Config struct { Source LDAPConfig `yaml:"source"` @@ -61,6 +82,7 @@ type Config struct { Hooks []string `yaml:"hooks"` Database DatabaseConfig `yaml:"database"` HookRetry HookRetryConfig `yaml:"hook_retry"` + Plugins PluginsConfig `yaml:"plugins"` } // SearchSpec represents a running search instance. @@ -146,7 +168,7 @@ var db *sql.DB // ldapStore is the function used to write a transformed entry to the destination // LDAP. It is a variable so tests can replace it with a mock without a live server. -var ldapStore func(*TransformedEntry) error +var ldapStore func(*TransformedEntry) (SyncOp, error) // handleEntryWindowHook is called in handleEntry between the first lock release // and the second lock acquisition — the exact window where the two-phase race @@ -155,9 +177,10 @@ var ldapStore func(*TransformedEntry) error var handleEntryWindowHook func() type pendingEntry struct { - entry *TransformedEntry - deps map[string]struct{} - rawDeps []string + entry *TransformedEntry + deps map[string]struct{} + rawDeps []string + searchID string } type dependencyState struct { @@ -330,6 +353,33 @@ func isMergeAttr(attr string) bool { return ok } +// dropUndefinedAttr inspects err for LDAP result code 17 (Undefined Attribute +// Type), parses the offending attribute name from the server diagnostic, adds +// it to skip, and logs a warning. Returns true if the caller should retry the +// operation without that attribute; returns false if the error is not retryable +// (wrong code, unparseable message, objectClass, or already skipped). +func dropUndefinedAttr(err error, dn string, skip map[string]struct{}) bool { + ldapErr, ok := err.(*ldap.Error) + if !ok || ldapErr.ResultCode != ldap.LDAPResultUndefinedAttributeType { + return false + } + if ldapErr.Err == nil { + return false + } + // OpenLDAP formats the diagnostic as ": attribute type undefined". + parts := strings.SplitN(ldapErr.Err.Error(), ":", 2) + attr := strings.TrimSpace(parts[0]) + if attr == "" || strings.EqualFold(attr, "objectClass") { + return false + } + if _, alreadySkipped := skip[attr]; alreadySkipped { + return false + } + logger.Warn("Attribute not in destination schema, dropping from write", "DN", dn, "Attr", attr) + skip[attr] = struct{}{} + return true +} + func isSliceValue(val interface{}) bool { switch val.(type) { case []interface{}, []string: @@ -621,7 +671,7 @@ func collectMissingBindings(entry *TransformedEntry, deps []string, bindings map return keys } -func (d *dependencyState) handleEntry(entry *TransformedEntry, deps []string) { +func (d *dependencyState) handleEntry(entry *TransformedEntry, deps []string, searchID string) { parentKey := normalizeDN(entry.DN) if parentKey == "" { logger.Error("Transformed entry has empty DN; skipping dependency processing") @@ -637,6 +687,11 @@ func (d *dependencyState) handleEntry(entry *TransformedEntry, deps []string) { if len(existing.rawDeps) > 0 { rawDeps = append(rawDeps, existing.rawDeps...) } + // Inherit the searchID from the earlier pending entry if the caller + // did not supply one, so plugin events keep a stable origin. + if searchID == "" { + searchID = existing.searchID + } for depKey := range existing.deps { parents := d.reverse[depKey] if parents != nil { @@ -698,11 +753,12 @@ func (d *dependencyState) handleEntry(entry *TransformedEntry, deps []string) { if len(missing) == 0 && !entryMissing && !depsMissing { d.mu.Unlock() - if err := ldapStore(resolvedEntry); err != nil { + op, err := ldapStore(resolvedEntry) + if err != nil { logger.Error("Error storing entry in destination LDAP", "DN", resolvedEntry.DN, "Err", err) return } - d.markSyncedAndRelease(resolvedEntry.DN) + d.markSyncedAndRelease(resolvedEntry.DN, searchID, resolvedEntry.Content, op) return } @@ -730,9 +786,10 @@ func (d *dependencyState) handleEntry(entry *TransformedEntry, deps []string) { } d.pending[parentKey] = &pendingEntry{ - entry: entry, - deps: missing, - rawDeps: rawDeps, + entry: entry, + deps: missing, + rawDeps: rawDeps, + searchID: searchID, } for depKey := range missing { parents := d.reverse[depKey] @@ -802,11 +859,11 @@ func (d *dependencyState) reprocessPending() { continue } logger.Debug("Reprocessing pending entry", "DN", pending.entry.DN, "RawDeps", len(pending.rawDeps)) - d.handleEntry(pending.entry, pending.rawDeps) + d.handleEntry(pending.entry, pending.rawDeps, pending.searchID) } } -func (d *dependencyState) markSyncedAndRelease(dn string) { +func (d *dependencyState) markSyncedAndRelease(dn string, searchID string, content map[string]interface{}, op SyncOp) { dnKey := normalizeDN(dn) if dnKey == "" { return @@ -889,17 +946,32 @@ func (d *dependencyState) markSyncedAndRelease(dn string) { "Deferred entry still missing bindings on release", "DN", pending.entry.DN, ) - d.handleEntry(pending.entry, pending.rawDeps) + d.handleEntry(pending.entry, pending.rawDeps, pending.searchID) continue } - if err := ldapStore(resolvedEntry); err != nil { + pendingOp, err := ldapStore(resolvedEntry) + if err != nil { logger.Error("Error storing deferred entry in destination LDAP", "DN", resolvedEntry.DN, "Err", err) continue } logger.Info("Storing deferred entry in destination LDAP", "DN", resolvedEntry.DN) - d.markSyncedAndRelease(resolvedEntry.DN) + d.markSyncedAndRelease(resolvedEntry.DN, pending.searchID, resolvedEntry.Content, pendingOp) } } + + // Plugin dispatch fires only after the entry is durably in target LDAP and + // outside the dependencyState mutex. An empty op signals "no actual write + // happened" (e.g. duplicate markSynced for an already-synced DN), in which + // case there is nothing to announce. + if op != "" { + dispatchSyncEvent(SyncEvent{ + SearchID: searchID, + DN: dn, + Content: content, + Op: op, + Timestamp: time.Now(), + }) + } } // initLogger initializes the logger using log/slog. @@ -992,7 +1064,7 @@ func performLDAPSearch(l *ldap.Conn, baseDN, filter string) (*ldap.SearchResult, return l.Search(searchRequest) } -func storeDestinationLDAP(entry *TransformedEntry) error { +func storeDestinationLDAP(entry *TransformedEntry) (SyncOp, error) { lock := getDNLock(entry.DN) lock.Lock() defer lock.Unlock() @@ -1000,17 +1072,18 @@ func storeDestinationLDAP(entry *TransformedEntry) error { // Connect to destination LDAP. l, err := ldap.DialURL(config.Target.URL) if err != nil { - return err + return "", err } defer l.Close() // Bind with destination credentials. if err = l.Bind(config.Target.BindDN, config.Target.BindPassword); err != nil { - return err + return "", err } // Check if the entry exists. - searchAttrs := []string{"dn"} + // Fetch objectClass so we can guard schema-extension attributes (e.g. groups). + searchAttrs := []string{"dn", "objectClass"} if len(mergeAttributes) > 0 { for attr := range mergeAttributes { searchAttrs = append(searchAttrs, attr) @@ -1034,7 +1107,7 @@ func storeDestinationLDAP(entry *TransformedEntry) error { // Treat it as if no entry was found. sr = &ldap.SearchResult{Entries: []*ldap.Entry{}} } else { - return err + return "", err } } @@ -1060,18 +1133,30 @@ func storeDestinationLDAP(entry *TransformedEntry) error { // If the entry doesn't exist, add it. if len(sr.Entries) == 0 { - addReq := ldap.NewAddRequest(entry.DN, nil) - for attr, values := range attributes { - addReq.Attribute(attr, values) - } - // Optionally, ensure an objectClass is set. - if _, exists := attributes["objectClass"]; !exists { - addReq.Attribute("objectClass", []string{"top", "inetOrgPerson"}) + skip := make(map[string]struct{}) + for range len(attributes) + 1 { + addReq := ldap.NewAddRequest(entry.DN, nil) + for attr, values := range attributes { + if _, s := skip[attr]; !s { + addReq.Attribute(attr, values) + } + } + if _, exists := attributes["objectClass"]; !exists { + addReq.Attribute("objectClass", []string{"top", "inetOrgPerson"}) + } + err = l.Add(addReq) + if err == nil { + break + } + if !dropUndefinedAttr(err, entry.DN, skip) { + return "", err + } } - if err = l.Add(addReq); err != nil { - return err + if err != nil { + return "", err } logger.Info("Added entry to destination LDAP", "DN", entry.DN) + return SyncOpCreated, nil } else { entryData := sr.Entries[0] for attr, values := range attributes { @@ -1090,16 +1175,28 @@ func storeDestinationLDAP(entry *TransformedEntry) error { attributes[attr] = mergeUnique(existing, values) } // If the entry exists, update it. - modReq := ldap.NewModifyRequest(entry.DN, nil) - for attr, values := range attributes { - modReq.Replace(attr, values) + skip := make(map[string]struct{}) + for range len(attributes) + 1 { + modReq := ldap.NewModifyRequest(entry.DN, nil) + for attr, values := range attributes { + if _, s := skip[attr]; !s { + modReq.Replace(attr, values) + } + } + err = l.Modify(modReq) + if err == nil { + break + } + if !dropUndefinedAttr(err, entry.DN, skip) { + return "", err + } } - if err = l.Modify(modReq); err != nil { - return err + if err != nil { + return "", err } logger.Info("Modified entry in destination LDAP", "DN", entry.DN) + return SyncOpUpdated, nil } - return nil } // ldapSearchAndSync performs the LDAP search on the source server and synchronizes the results. @@ -1157,7 +1254,7 @@ func ldapSearchAndSync(id, filter, baseDN string, refresh int, oneshot bool, sto } // processHookResponse is a stub for processing the hook response. -func processHookResponse(hookResp HookResponse) { +func processHookResponse(hookResp HookResponse, sourceSearchID string) { // Log the parsed hook response values. logger.Debug("Processing Hook response", "Transformed", hookResp.Transformed, "Derived", hookResp.Derived, "Reset", hookResp.Reset) @@ -1171,7 +1268,7 @@ func processHookResponse(hookResp HookResponse) { for i := range hookResp.Transformed { transformed := hookResp.Transformed[i] logger.Debug("Processing transformed hook response for DN", "DN", transformed.DN) - dependencyTracker.handleEntry(&transformed, hookResp.Dependencies) + dependencyTracker.handleEntry(&transformed, hookResp.Dependencies, sourceSearchID) } } else { logger.Info("No transformed data in hook response") @@ -1296,7 +1393,7 @@ func postToHookWithRetry(hookURL string, payload []byte) (*http.Response, error) } // sendHooks posts the LDAP result to each URL specified in config.Hooks. -func sendHooks(result LDAPResult) { +func sendHooks(result LDAPResult, sourceSearchID string) { payload, err := json.Marshal(result) if err != nil { logger.Error("Error marshalling hook payload for DN", "DN", result.DN, "Err", err) @@ -1324,7 +1421,7 @@ func sendHooks(result LDAPResult) { } for _, hookResp := range hookResps { - processHookResponse(hookResp) + processHookResponse(hookResp, sourceSearchID) } }(url) } @@ -1383,7 +1480,7 @@ func processLDAPEntry(id string, entry *ldap.Entry, oneshot bool) { } if shouldSend { - sendHooks(newResult) + sendHooks(newResult, id) } } @@ -1745,6 +1842,8 @@ func main() { os.Exit(1) } + initPluginRegistry(config.Plugins) + // Initialize database if enabled in config if config.Database.Enabled { if err := initDB(config.Database); err != nil {