Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions src/IntuneCD/intunecdlib/BaseGraphModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import time
import uuid
from copy import deepcopy
from uuid import uuid4

import requests
Expand Down Expand Up @@ -660,19 +661,24 @@ def batch_intents(self, data: list) -> dict:

return intent_values

def get_object_assignment(self, data_id: str, responses: list) -> list:
def get_object_assignment(
self, data_id: str, responses: list, preserve_group_id: bool = False
) -> list:
"""
Get the object assignment for the object ID.

:param data_id: Id of the object to get the assignment for
:param responses: List of responses from the batch request
:param preserve_group_id: Preserve group IDs for update comparisons
:return: List of assignments for the object
"""
if not responses:
return []
remove_keys = {"id", "groupId", "sourceId"}
remove_keys = {"id", "sourceId"}
if not preserve_group_id:
remove_keys.add("groupId")
assignments_list = [
val
deepcopy(val)
for list in responses
if list and "value" in list
if data_id in list["@odata.context"]
Expand All @@ -682,6 +688,10 @@ def get_object_assignment(self, data_id: str, responses: list) -> list:
for k in remove_keys:
value.pop(k, None)
value["target"].pop(k, None)
if preserve_group_id:
value["target"].pop("groupName", None)
value["target"].pop("groupType", None)
value["target"].pop("membershipRule", None)

return assignments_list

Expand Down Expand Up @@ -743,6 +753,12 @@ def get_added_removed(self, diff_object: dict) -> list:
):
target = diff_object[root]["target"]["groupId"]

if (
diff_object[root]["target"]["@odata.type"]
== "#microsoft.graph.exclusionGroupAssignmentTarget"
):
target = diff_object[root]["target"]["groupId"]

if (
diff_object[root]["target"]["@odata.type"]
== "#microsoft.graph.allDevicesAssignmentTarget"
Expand Down Expand Up @@ -773,12 +789,7 @@ def update_assignment(
:return: If update is true, return repo data, else return None
"""

diff = DeepDiff(intune_data, repo_data, ignore_order=True)
added = diff.get("iterable_item_added", {})
update = False

if not diff:
return None
update = not repo_data

for val in repo_data:
# Request group id based on group name
Expand Down Expand Up @@ -852,20 +863,36 @@ def update_assignment(
):
update = True

diff = DeepDiff(intune_data, repo_data, ignore_order=True)

if not diff:
return None

if update is True:
# Print added assignments
added = {
key: value
for key, value in added.items()
if "target" in value and "groupName" not in value["target"]
}
added = diff.get("iterable_item_added", {})
removed = diff.get("iterable_item_removed", {})
changed = diff.get("values_changed", {})

self.log(msg="Updating assignments")

if added:
self.log(msg="Updating assignments, added assignments:")
self.log(msg="Added assignments:")
updates = self.get_added_removed(added)
for update in updates:
self.log(msg=update)
return repo_data
return None

if removed:
self.log(msg="Removed assignments:")
updates = self.get_added_removed(removed)
for update in updates:
self.log(msg=update)

if changed:
self.log(msg="Changed assignments detected")

return repo_data

return None

def make_azure_request(
self,
Expand Down
4 changes: 2 additions & 2 deletions src/IntuneCD/intunecdlib/BaseUpdateModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def handle_assignments(
intune_id (str): The intune configuration id to use
"""
intune_assignment_data = self.get_object_assignment(
intune_id, intune_assignments
intune_id, intune_assignments, preserve_group_id=True
)
assignment_update = self.update_assignment(
repo_assignments, intune_assignment_data, self.create_groups
Expand Down Expand Up @@ -529,7 +529,7 @@ def handle_iterable_assignments(
intune_id (str): The intune configuration id to use
"""
intune_assignment_data = self.get_object_assignment(
intune_id, intune_assignments
intune_id, intune_assignments, preserve_group_id=True
)
assignment_update = self.update_assignment(
repo_assignments, intune_assignment_data, self.create_groups
Expand Down
164 changes: 164 additions & 0 deletions tests/test_base_graph_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
import unittest
from unittest.mock import patch

from src.IntuneCD.intunecdlib.BaseGraphModule import BaseGraphModule


class TestBaseGraphModuleAssignments(unittest.TestCase):
"""Tests for assignment comparison and normalization."""

def setUp(self):
self.module = BaseGraphModule()

def test_get_object_assignment_preserves_group_id_for_update_comparison(self):
responses = [
{
"@odata.context": (
"https://graph.microsoft.com/beta/deviceManagement/"
"configurationPolicies/policy-id/assignments"
),
"value": [
{
"id": "assignment-id",
"sourceId": "policy-id",
"target": {
"@odata.type": "#microsoft.graph.groupAssignmentTarget",
"groupId": "group-id",
"groupName": "Test Group",
"groupType": "StaticMembership",
},
}
],
}
]

backup_assignment = self.module.get_object_assignment("policy-id", responses)
update_assignment = self.module.get_object_assignment(
"policy-id", responses, preserve_group_id=True
)

self.assertNotIn("groupId", backup_assignment[0]["target"])
self.assertEqual(backup_assignment[0]["target"]["groupName"], "Test Group")
self.assertEqual(update_assignment[0]["target"]["groupId"], "group-id")
self.assertNotIn("groupName", update_assignment[0]["target"])

@patch.object(BaseGraphModule, "log")
def test_update_assignment_returns_repo_data_for_removal_only_diff(self, mock_log):
repo_data = [
{
"intent": "Include",
"target": {
"@odata.type": "#microsoft.graph.allDevicesAssignmentTarget"
},
}
]
intune_data = [
{
"intent": "Include",
"target": {
"@odata.type": "#microsoft.graph.allDevicesAssignmentTarget"
},
},
{
"intent": "Exclude",
"target": {
"@odata.type": "#microsoft.graph.groupAssignmentTarget",
"groupId": "removed-group-id",
},
},
]

result = self.module.update_assignment(repo_data, intune_data, False)

self.assertIs(result, repo_data)
mock_log.assert_any_call(msg="Updating assignments")
mock_log.assert_any_call(msg="Removed assignments:")

@patch.object(BaseGraphModule, "log")
def test_update_assignment_returns_empty_repo_data_when_all_assignments_removed(
self, mock_log
):
repo_data = []
intune_data = [
{
"intent": "Exclude",
"target": {
"@odata.type": "#microsoft.graph.exclusionGroupAssignmentTarget",
"groupId": "removed-group-id",
},
}
]

result = self.module.update_assignment(repo_data, intune_data, False)

self.assertIs(result, repo_data)
mock_log.assert_any_call(msg="Updating assignments")
mock_log.assert_any_call(msg="Removed assignments:")
mock_log.assert_any_call(
msg="intent: Exclude, Filter ID: , Filter Type: , "
"target: removed-group-id"
)

@patch.object(BaseGraphModule, "log")
def test_update_assignment_returns_repo_data_for_assignment_type_change(
self, mock_log
):
repo_data = [
{
"intent": "Include",
"target": {
"@odata.type": "#microsoft.graph.allDevicesAssignmentTarget"
},
}
]
intune_data = [
{
"intent": "Include",
"target": {
"@odata.type": "#microsoft.graph.groupAssignmentTarget",
"groupId": "old-group-id",
},
}
]

result = self.module.update_assignment(repo_data, intune_data, False)

self.assertIs(result, repo_data)
mock_log.assert_any_call(msg="Updating assignments")
mock_log.assert_any_call(msg="Changed assignments detected")

@patch.object(BaseGraphModule, "make_graph_request")
def test_update_assignment_ignores_same_group_after_group_name_resolution(
self, mock_make_graph_request
):
repo_data = [
{
"intent": "Include",
"target": {
"@odata.type": "#microsoft.graph.groupAssignmentTarget",
"groupName": "Test Group",
"groupType": "StaticMembership",
},
}
]
intune_data = [
{
"intent": "Include",
"target": {
"@odata.type": "#microsoft.graph.groupAssignmentTarget",
"groupId": "group-id",
},
}
]
mock_make_graph_request.return_value = {"value": [{"id": "group-id"}]}

result = self.module.update_assignment(repo_data, intune_data, False)

self.assertIsNone(result)
self.assertEqual(repo_data[0]["target"]["groupId"], "group-id")
self.assertNotIn("groupName", repo_data[0]["target"])


if __name__ == "__main__":
unittest.main()
Loading