Skip to content

Commit 6e74c3a

Browse files
Update approach to logging in connector component (#2809)
Co-authored-by: Jedr Blaszyk <[email protected]>
1 parent 4f6ca9b commit 6e74c3a

File tree

9 files changed

+293
-62
lines changed

9 files changed

+293
-62
lines changed

connectors/agent/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
from elastic_agent_client.util.async_tools import (
1111
sleeps_for_retryable,
1212
)
13-
from elastic_agent_client.util.logger import logger
1413

1514
from connectors.agent.component import ConnectorsAgentComponent
15+
from connectors.agent.logger import get_logger
16+
17+
logger = get_logger("cli")
1618

1719

1820
def main(args=None):

connectors/agent/component.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
from elastic_agent_client.service.checkin import CheckinV2Service
1212

1313
from connectors.agent.config import ConnectorsAgentConfigurationWrapper
14+
from connectors.agent.logger import get_logger
1415
from connectors.agent.protocol import ConnectorActionHandler, ConnectorCheckinHandler
1516
from connectors.agent.service_manager import ConnectorServiceManager
1617
from connectors.services.base import MultiService
1718

19+
logger = get_logger("component")
20+
1821
CONNECTOR_SERVICE = "connector-service"
1922

2023

@@ -51,6 +54,7 @@ async def run(self):
5154
5255
Additionally services for handling Check-in and Actions will be started to implement the protocol correctly.
5356
"""
57+
logger.info("Starting connectors agent component")
5458
client = new_v2_from_reader(self.buffer, self.ver, self.opts)
5559
action_handler = ConnectorActionHandler()
5660
self.connector_service_manager = ConnectorServiceManager(self.config_wrapper)
@@ -71,4 +75,5 @@ def stop(self, sig):
7175
7276
Attempts to gracefully shutdown the services that are running under the component.
7377
"""
78+
logger.info("Shutting down connectors agent component")
7479
self.multi_service.shutdown(sig)

connectors/agent/config.py

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
#
66
import base64
77

8+
from connectors.agent.logger import get_logger
89
from connectors.config import add_defaults
10+
from connectors.utils import nested_get_from_dict
11+
12+
logger = get_logger("config")
913

1014

1115
class ConnectorsAgentConfigurationWrapper:
@@ -25,10 +29,11 @@ def __init__(self):
2529
Service config and specific config coming from Agent.
2630
"""
2731
self._default_config = {
28-
"_force_allow_native": True,
2932
"service": {
33+
"log_level": "INFO",
3034
"_use_native_connector_api_keys": False,
3135
},
36+
"_force_allow_native": True,
3237
"native_service_types": [
3338
"azure_blob_storage",
3439
"box",
@@ -60,7 +65,7 @@ def __init__(self):
6065

6166
self.specific_config = {}
6267

63-
def try_update(self, source):
68+
def try_update(self, unit):
6469
"""Try update the configuration and see if it changed.
6570
6671
This method takes the check-in event coming from Agent and checks if config needs an update.
@@ -69,33 +74,92 @@ def try_update(self, source):
6974
the method returns False.
7075
"""
7176

77+
source = unit.config.source
78+
7279
# TODO: find a good link to what this object is.
7380
has_hosts = source.fields.get("hosts")
7481
has_api_key = source.fields.get("api_key")
7582
has_basic_auth = source.fields.get("username") and source.fields.get("password")
83+
84+
assumed_configuration = {}
85+
86+
# Log-related
87+
assumed_configuration["service"] = {}
88+
assumed_configuration["service"]["log_level"] = unit.log_level
89+
90+
# Auth-related
7691
if has_hosts and (has_api_key or has_basic_auth):
77-
es_creds = {
78-
"host": source["hosts"][0],
79-
}
92+
es_creds = {"host": source["hosts"][0]}
8093

8194
if source.fields.get("api_key"):
95+
logger.debug("Found api_key")
8296
api_key = source["api_key"]
8397
# if beats_logstash_format we need to base64 the key
8498
if ":" in api_key:
8599
api_key = base64.b64encode(api_key.encode()).decode()
86100

87101
es_creds["api_key"] = api_key
88102
elif source.fields.get("username") and source.fields.get("password"):
103+
logger.debug("Found username and passowrd")
89104
es_creds["username"] = source["username"]
90105
es_creds["password"] = source["password"]
91106
else:
92107
msg = "Invalid Elasticsearch credentials"
93108
raise ValueError(msg)
94109

95-
new_config = {
96-
"elasticsearch": es_creds,
97-
}
98-
self.specific_config = new_config
110+
assumed_configuration["elasticsearch"] = es_creds
111+
112+
if self.config_changed(assumed_configuration):
113+
logger.debug("Changes detected for connectors-relevant configurations")
114+
# This is a partial update.
115+
# Agent can send different data in updates.
116+
# For example, updating only log_level will not send credentials.
117+
# Thus we don't overwrite configuration, we only update fields that
118+
# were received
119+
self.specific_config.update(assumed_configuration)
120+
return True
121+
122+
logger.debug("No changes detected for connectors-relevant configurations")
123+
return False
124+
125+
def config_changed(self, new_config):
126+
"""See if configuration passed in new_config will update currently stored configuration
127+
128+
This method takes the new configuration received from the agent and see if there are any changes
129+
to existing configuration.
130+
131+
If new_config contains new values for relevant fields, then True is returned, otherwise it returns False.
132+
"""
133+
# TODO: For now manually check, need to think of a better way?
134+
# Not super proud of this function, but hey it's tested
135+
logger.debug("Checking if config changed")
136+
current_config = self._default_config.copy()
137+
current_config.update(self.specific_config)
138+
139+
def _log_level_changed():
140+
new_config_log_level = nested_get_from_dict(
141+
new_config, ("service", "log_level")
142+
)
143+
current_config_log_level = nested_get_from_dict(
144+
current_config, ("service", "log_level")
145+
)
146+
147+
if new_config_log_level is None:
148+
return False
149+
150+
return current_config_log_level != new_config_log_level
151+
152+
def _elasticsearch_config_changed():
153+
return current_config.get("elasticsearch") != new_config.get(
154+
"elasticsearch"
155+
)
156+
157+
if _log_level_changed():
158+
logger.debug("log_level changed")
159+
return True
160+
161+
if _elasticsearch_config_changed():
162+
logger.debug("elasticsearch changed")
99163
return True
100164

101165
return False
@@ -112,8 +176,8 @@ def get(self):
112176
"""
113177
# First take "default config"
114178
config = self._default_config.copy()
115-
# Then override with what we get from Agent
116-
config.update(self.specific_config)
179+
# Then merge with what we get from Agent
180+
config = dict(add_defaults(self.specific_config, default_config=config))
117181
# Then merge with default connectors config
118182
configuration = dict(add_defaults(config))
119183

connectors/agent/logger.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the Elastic License 2.0;
4+
# you may not use this file except in compliance with the Elastic License 2.0.
5+
#
6+
import logging
7+
8+
import ecs_logging
9+
10+
root_logger = logging.getLogger("agent_component")
11+
handler = logging.StreamHandler()
12+
handler.setFormatter(ecs_logging.StdlibFormatter())
13+
root_logger.addHandler(handler)
14+
root_logger.setLevel(logging.INFO)
15+
16+
17+
def get_logger(module):
18+
logger = root_logger.getChild(module)
19+
20+
if logger.hasHandlers():
21+
return logger
22+
23+
logger.addHandler(handler)
24+
25+
return logger
26+
27+
28+
def update_logger_level(log_level):
29+
root_logger.setLevel(log_level)

connectors/agent/protocol.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
# or more contributor license agreements. Licensed under the Elastic License 2.0;
44
# you may not use this file except in compliance with the Elastic License 2.0.
55
#
6+
67
from elastic_agent_client.generated import elastic_agent_client_pb2 as proto
78
from elastic_agent_client.handler.action import BaseActionHandler
89
from elastic_agent_client.handler.checkin import BaseCheckinHandler
9-
from elastic_agent_client.util.logger import logger
10+
11+
from connectors.agent.logger import get_logger
12+
13+
logger = get_logger("protocol")
1014

1115

1216
class ConnectorActionHandler(BaseActionHandler):
@@ -64,12 +68,10 @@ async def apply_from_client(self):
6468
for unit in self.client.units
6569
if unit.unit_type == proto.UnitType.OUTPUT
6670
]
67-
6871
if len(outputs) > 0 and outputs[0].config:
6972
logger.debug("Outputs were found")
70-
source = outputs[0].config.source
7173

72-
changed = self.agent_connectors_config_wrapper.try_update(source)
74+
changed = self.agent_connectors_config_wrapper.try_update(outputs[0])
7375
if changed:
7476
logger.info("Updating connector service manager config")
7577
self.service_manager.restart()

connectors/agent/service_manager.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@
33
# or more contributor license agreements. Licensed under the Elastic License 2.0;
44
# you may not use this file except in compliance with the Elastic License 2.0.
55
#
6-
from connectors.logger import DocumentLogger
6+
import logging
7+
8+
import connectors.agent.logger
9+
import connectors.logger
10+
from connectors.agent.logger import get_logger
711
from connectors.services.base import (
812
ServiceAlreadyRunningError,
913
get_services,
1014
)
1115
from connectors.utils import CancellableSleeps
1216

17+
logger = get_logger("service_manager")
18+
1319

1420
class ConnectorServiceManager:
1521
"""Class responsible for properly configuring and running Connectors Service in Elastic Agent
@@ -22,8 +28,6 @@ class ConnectorServiceManager:
2228
2329
"""
2430

25-
name = "connector-service-manager"
26-
2731
def __init__(self, configuration):
2832
"""Inits ConnectorServiceManager with shared ConnectorsAgentConfigurationWrapper.
2933
@@ -32,10 +36,6 @@ def __init__(self, configuration):
3236
3337
There is nothing enforcing it, but expect problems if that happens.
3438
"""
35-
service_name = self.__class__.name
36-
self._logger = DocumentLogger(
37-
f"[{service_name}]", {"service_name": service_name}
38-
)
3939
self._agent_config = configuration
4040
self._multi_service = None
4141
self._running = False
@@ -59,26 +59,34 @@ async def run(self):
5959
try:
6060
while self._running:
6161
try:
62-
self._logger.info("Starting connector services")
62+
logger.info("Starting connector services")
63+
config = self._agent_config.get()
6364
self._multi_service = get_services(
6465
["schedule", "sync_content", "sync_access_control", "cleanup"],
65-
self._agent_config.get(),
66+
config,
6667
)
68+
log_level = config.get("service", {}).get(
69+
"log_level", logging.INFO
70+
) # Log Level for connectors is managed like this
71+
connectors.logger.set_logger(log_level, filebeat=True)
72+
# Log Level for agent connectors component itself
73+
connectors.agent.logger.update_logger_level(log_level)
74+
6775
await self._multi_service.run()
6876
except Exception as e:
69-
self._logger.exception(
77+
logger.exception(
7078
f"Error while running services in ConnectorServiceManager: {e}"
7179
)
7280
raise
7381
finally:
74-
self._logger.info("Finished running, exiting")
82+
logger.info("Finished running, exiting")
7583

7684
def stop(self):
7785
"""Stop the service manager and all running subservices.
7886
7987
Running stop attempts to gracefully shutdown all subservices currently running.
8088
"""
81-
self._logger.info("Stopping connector services.")
89+
logger.info("Stopping connector services.")
8290
self._running = False
8391
self._done = True
8492
if self._multi_service:
@@ -91,6 +99,6 @@ def restart(self):
9199
After services are gracefully stopped, they will be started again with fresh configuration
92100
that comes from ConnectorsAgentConfigurationWrapper.
93101
"""
94-
self._logger.info("Restarting connector services")
102+
logger.info("Restarting connector services")
95103
if self._multi_service:
96104
self._multi_service.shutdown(None)

connectors/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ def load_config(config_file):
2828
return configuration
2929

3030

31-
def add_defaults(config):
32-
configuration = dict(_merge_dicts(_default_config(), config))
31+
def add_defaults(config, default_config=None):
32+
if default_config is None:
33+
default_config = _default_config()
34+
configuration = dict(_merge_dicts(default_config, config))
3335
return configuration
3436

3537

0 commit comments

Comments
 (0)