Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions docs/source/developer-guide/add_metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Custom Metrics
UCM supports custom metrics with bidirectional updates from both Python and C++ runtimes. The unified monitoring interface provides the ability to mutate stats across language boundaries through a shared metrics registry.

## Architecture Overview
The metrics consists of these components below:
- **monitor** : Central stats registry that manages all metric lifecycle operations (registration, creation, updates, queries)
- **observability.py** : Prometheus integration layer that handles metric exposition
- **metrics_config.yaml** : Declarative configuration that defines which custom metrics to register and their properties

## Getting Started
### Define Metrics in YAML
Prometheus provides three fundamental metric types: Counter, Gauge, and Histogram. UCM implements corresponding wrappers for each type. The method for adding new metrics is as follows; please refer to the [example YAML](https://github.com/ModelEngine-Group/unified-cache-management/blob/develop/examples/metrics/metrics_configs.yaml) for more detailed information.
```yaml
log_interval: 5 # Interval in seconds for logging metrics

prometheus:
multiproc_dir: "/vllm-workspace" # Directory for Prometheus multiprocess mode

metric_prefix: "ucm:"

# Enable/disable metrics by category
enabled_metrics:
counters: true
gauges: true
histograms: true

# Counter metrics configuration
counters:
- name: "received_requests"
documentation: "Total number of requests sent to ucm"

# Gauge metrics configuration
gauges:
- name: "external_lookup_hit_rate"
documentation: "Hit rate of ucm lookup requests"
multiprocess_mode: "livemostrecent"

# Histogram metrics configuration
histograms:
- name: "load_requests_num"
documentation: "Number of requests loaded from ucm"
buckets: [1, 5, 10, 20, 50, 100, 200, 500, 1000]
```

### Use Monitor APIs to Update Stats
The monitor provides a unified interface for metric operations. Users only need to create stats and update them, while the observability component is responsible for fetching the stats and pushing them to Prometheus.
:::::{tab-set}
:sync-group: install

::::{tab-item} Python side interfaces
:selected:
:sync: py
**Lifecycle Methods**
- `create_stats(name)`: Create and initialize a registered stats object.

**Operation Methods**
- `update_stats(name, dict)`: Update specific fields of a specific stats object.
- `get_stats(name)`: Retrieve current values of a specific stats object.
- `get_stats_and_clear(name)`: Retrieve and reset a specific stats object.
- `get_all_stats_and_clear()`: Retrieve and reset all stats objects.
- `reset_stats(name)`: Reset a specific stats object to initial state.
- `reset_all()`: Reset all stats registered in monitor.

**Example:** Using built-in ConnStats
```python
from ucm.shared.metrics import ucmmonitor

ucmmonitor.create_stats("ConnStats") # Create a stats obj

# Update stats
ucmmonitor.update_stats(
"ConnStats",
{"interval_lookup_hit_rates": external_hit_blocks / len(ucm_block_ids)},
)

```
See more detailed example in [test case](https://github.com/ModelEngine-Group/unified-cache-management/tree/develop/ucm/shared/test/example).

::::

::::{tab-item} C++ side interfaces
:sync: cc
**Lifecycle Methods**
- `CreateStats(const std::string& name)`: Create and initialize a registered stats object.

**Operation Methods**
- `UpdateStats(const std::string& name, const std::unordered_map<std::string, double>& params)`: Update specific fields of a specific stats object.
- `ResetStats(const std::string& name)`: Retrieve current values of a specific stats object.
- `ResetAllStats()`: Retrieve and reset a specific stats object.
- `GetStats(const std::string& name)`: Retrieve and reset all stats objects.
- `GetStatsAndClear(const std::string& name)`: Reset a specific stats object to initial state.
- `GetAllStatsAndClear()`: Reset all stats registered in monitor.

**Example:** Implementing custom stats in C++
UCM supports custom metrics by following steps:
- Step 1: linking the static library monitor_static
```c++
target_link_libraries(xxxstore PUBLIC storeinfra monitor_static)
```
- Step 2: Create stats object using function **CreateStats**
- Step 3: Update using function **UpdateStats**

See more detailed example in [test case](https://github.com/ModelEngine-Group/unified-cache-management/tree/develop/ucm/shared/test/case).

::::
:::::
1 change: 1 addition & 0 deletions docs/source/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ user-guide/metrics/metrics
:caption: Developer Guide
:maxdepth: 1
developer-guide/contribute
developer-guide/add_metrics
:::

:::{toctree}
Expand Down
10 changes: 5 additions & 5 deletions ucm/integration/vllm/ucm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from vllm.v1.core.sched.output import SchedulerOutput

from ucm.logger import init_logger
from ucm.observability import UCMStatsLogger
from ucm.shared.metrics import ucmmonitor
from ucm.shared.metrics.observability import UCMStatsLogger
from ucm.store.factory import UcmConnectorFactory
from ucm.store.ucmstore import Task, UcmKVStoreBase
from ucm.utils import Config
Expand Down Expand Up @@ -172,12 +172,12 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):

self.metrics_config = self.launch_config.get("metrics_config_path", "")
if self.metrics_config:
ucmmonitor.create_stats("ConnStats")
self.stats_logger = UCMStatsLogger(
vllm_config.model_config.served_model_name,
self.global_rank,
self.metrics_config,
)
self.monitor = ucmmonitor.StatsMonitor.get_instance()

self.synchronize = (
torch.cuda.synchronize
Expand Down Expand Up @@ -236,7 +236,7 @@ def get_num_new_matched_tokens(
f"hit external: {external_hit_blocks}"
)
if self.metrics_config:
self.monitor.update_stats(
ucmmonitor.update_stats(
"ConnStats",
{"interval_lookup_hit_rates": external_hit_blocks / len(ucm_block_ids)},
)
Expand Down Expand Up @@ -532,7 +532,7 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
/ 1024
) # GB/s
if self.metrics_config and is_load:
self.monitor.update_stats(
ucmmonitor.update_stats(
"ConnStats",
{
"load_requests_num": num_loaded_request,
Expand Down Expand Up @@ -622,7 +622,7 @@ def wait_for_save(self) -> None:
/ 1024
) # GB/s
if self.metrics_config and is_save:
self.monitor.update_stats(
ucmmonitor.update_stats(
"ConnStats",
{
"save_requests_num": num_saved_request,
Expand Down
99 changes: 30 additions & 69 deletions ucm/shared/metrics/observability.py → ucm/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,17 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

# Be impressed by https://github.com/LMCache/LMCache/blob/dev/lmcache/observability.py

import os
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from typing import Any, List

import prometheus_client
import yaml

# Third Party
from prometheus_client import REGISTRY
from vllm.distributed.parallel_state import get_world_group

from ucm.logger import init_logger
from ucm.shared.metrics import ucmmonitor

Expand All @@ -56,7 +51,7 @@ class PrometheusLogger:
_counter_cls = prometheus_client.Counter
_histogram_cls = prometheus_client.Histogram

def __init__(self, metadata: UCMEngineMetadata, config: Dict[str, Any]):
def __init__(self, metadata: UCMEngineMetadata, config: dict[str, Any]):
# Ensure PROMETHEUS_MULTIPROC_DIR is set before any metric registration
prometheus_config = config.get("prometheus", {})
multiproc_dir = prometheus_config.get("multiproc_dir", "/vllm-workspace")
Expand All @@ -74,7 +69,7 @@ def __init__(self, metadata: UCMEngineMetadata, config: Dict[str, Any]):
self._init_metrics_from_config(labelnames, prometheus_config)

def _init_metrics_from_config(
self, labelnames: List[str], prometheus_config: Dict[str, Any]
self, labelnames: List[str], prometheus_config: dict[str, Any]
):
"""Initialize metrics based on configuration"""
enabled = prometheus_config.get("enabled_metrics", {})
Expand All @@ -85,7 +80,7 @@ def _init_metrics_from_config(

# Store metric mapping: metric_name -> (metric_type, attribute_name, stats_field_name)
# This mapping will be used in log_prometheus to dynamically log metrics
self.metric_mappings: Dict[str, Dict[str, str]] = {}
self.metric_mappings: dict[str, dict[str, str]] = {}

# Initialize counters
if enabled.get("counters", True):
Expand Down Expand Up @@ -172,49 +167,46 @@ def _init_metrics_from_config(
"attr": attr_name,
}

def _log_gauge(self, gauge, data: Union[int, float]) -> None:
def _set_gauge(self, gauge, data: List) -> None:
# Convenience function for logging to gauge.
if not data:
return
gauge.labels(**self.labels).set(data)

def _log_counter(self, counter, data: Union[int, float]) -> None:
def _inc_counter(self, counter, data: List) -> None:
# Convenience function for logging to counter.
# Prevent ValueError from negative increment
if data < 0:
return
counter.labels(**self.labels).inc(data)
counter.labels(**self.labels).inc(sum(data))

def _log_histogram(self, histogram, data: Union[List[int], List[float]]) -> None:
def _observe_histogram(self, histogram, data: List) -> None:
# Convenience function for logging to histogram.
for value in data:
histogram.labels(**self.labels).observe(value)

def log_prometheus(self, stats: Any):
def update_stats(self, stats: dict[str, List]):
"""Log metrics to Prometheus based on configuration file"""
# Dynamically log metrics based on what's configured in YAML
for stat_name, value in stats.items():
try:
metric_mapped = self.metric_mappings[stat_name]
if metric_mapped is None:
logger.warning(f"Stat {stat_name} not initialized.")
logger.debug(f"Stat {stat_name} not initialized.")
continue
metric_obj = getattr(self, metric_mapped["attr"], None)
metric_type = metric_mapped["type"]

# Log based on metric type
if metric_type == "counter":
self._log_counter(metric_obj, value)
self._set_gauge(metric_obj, value)
elif metric_type == "gauge":
self._log_gauge(metric_obj, value)
self._inc_counter(metric_obj, value)
elif metric_type == "histogram":
# Histograms expect a list
if not isinstance(value, list):
if value:
value = [value]
else:
value = []
self._log_histogram(metric_obj, value)
except Exception as e:
logger.warning(f"Failed to log metric {stat_name}: {e}")
self._observe_histogram(metric_obj, value)
else:
logger.error(f"Not found metric type for {stat_name}")
except Exception:
logger.debug(f"Failed to log metric {stat_name}")

@staticmethod
def _metadata_to_labels(metadata: UCMEngineMetadata):
Expand All @@ -223,40 +215,6 @@ def _metadata_to_labels(metadata: UCMEngineMetadata):
"worker_id": metadata.worker_id,
}

_instance = None

@staticmethod
def GetOrCreate(
metadata: UCMEngineMetadata,
config_path: str = "",
) -> "PrometheusLogger":
if PrometheusLogger._instance is None:
PrometheusLogger._instance = PrometheusLogger(metadata, config_path)
# assert PrometheusLogger._instance.metadata == metadata, \
# "PrometheusLogger instance already created with different metadata"
if PrometheusLogger._instance.metadata != metadata:
logger.error(
"PrometheusLogger instance already created with"
"different metadata. This should not happen except "
"in test"
)
return PrometheusLogger._instance

@staticmethod
def GetInstance() -> "PrometheusLogger":
assert (
PrometheusLogger._instance is not None
), "PrometheusLogger instance not created yet"
return PrometheusLogger._instance

@staticmethod
def GetInstanceOrNone() -> Optional["PrometheusLogger"]:
"""
Returns the singleton instance of PrometheusLogger if it exists,
otherwise returns None.
"""
return PrometheusLogger._instance


class UCMStatsLogger:
def __init__(self, model_name: str, rank: int, config_path: str = ""):
Expand All @@ -267,15 +225,13 @@ def __init__(self, model_name: str, rank: int, config_path: str = ""):
# Load configuration
config = self._load_config(config_path)
self.log_interval = config.get("log_interval", 10)

self.monitor = ucmmonitor.StatsMonitor.get_instance()
self.prometheus_logger = PrometheusLogger.GetOrCreate(self.metadata, config)
self.prometheus_logger = PrometheusLogger(self.metadata, config)
self.is_running = True

self.thread = threading.Thread(target=self.log_worker, daemon=True)
self.thread.start()

def _load_config(self, config_path: str) -> Dict[str, Any]:
def _load_config(self, config_path: str) -> dict[str, Any]:
"""Load configuration from YAML file"""
try:
with open(config_path, "r") as f:
Expand All @@ -295,11 +251,16 @@ def _load_config(self, config_path: str) -> Dict[str, Any]:

def log_worker(self):
while self.is_running:
# Use UCMStatsMonitor.get_states_and_clear() from external import
stats = self.monitor.get_stats_and_clear("ConnStats")
self.prometheus_logger.log_prometheus(stats)
stats = ucmmonitor.get_all_stats_and_clear().data
self.prometheus_logger.update_stats(stats)
time.sleep(self.log_interval)

def shutdown(self):
self.is_running = False
self.thread.join()

def __del__(self):
try:
self.shutdown()
except Exception:
pass
Loading