diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 9ea5abd4..76053349 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -30,9 +30,54 @@ jobs: strategy: matrix: - workload: - - sync-table - - sync-query + include: + - prefix: table + workload: sync-table + create-args: grpc://localhost:2135 /Root/testdb + run-args: | + grpc://localhost:2135 /Root/testdb \ + --prom-pgw localhost:9091 \ + --report-period 250 \ + --time ${{inputs.slo_workload_duration_seconds || 600}} \ + --read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \ + --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ + --read-timeout 1000 \ + --write-timeout 1000 + cleanup-args: grpc://localhost:2135 /Root/testdb + - prefix: table + workload: sync-query + create-args: grpc://localhost:2135 /Root/testdb + run-args: | + grpc://localhost:2135 /Root/testdb \ + --prom-pgw localhost:9091 \ + --report-period 250 \ + --time ${{inputs.slo_workload_duration_seconds || 600}} \ + --read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \ + --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ + --read-timeout 1000 \ + --write-timeout 1000 + cleanup-args: grpc://localhost:2135 /Root/testdb + # - prefix: topic + # workload: topic-basic + # create-args: | + # grpc://localhost:2135 /Root/testdb \ + # --path /Root/testdb/slo_topic \ + # --partitions-count 10 + # run-args: | + # grpc://localhost:2135 /Root/testdb \ + # --path /Root/testdb/slo_topic \ + # --prom-pgw localhost:9091 \ + # --partitions-count 10 \ + # --read-threads 10 \ + # --write-threads 10 \ + # --report-period 250 \ + # --time ${{inputs.slo_workload_duration_seconds || 600}} \ + # --read-rps ${{inputs.slo_workload_read_max_rps || 100}} \ + # --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ + # --read-timeout 5000 \ + # --write-timeout 5000 + # cleanup-args: grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic + concurrency: group: slo-${{ github.ref }}-${{ matrix.workload }} @@ -64,26 +109,19 @@ jobs: - name: Prepare SLO Database run: | - python ./tests/slo/src create grpc://localhost:2135 /Root/testdb + python ./tests/slo/src ${{ matrix.prefix }}-create ${{ matrix.create-args }} - name: Run SLO Tests env: REF: '${{ github.head_ref || github.ref }}' WORKLOAD: '${{ matrix.workload }}' run: | - python ./tests/slo/src run grpc://localhost:2135 /Root/testdb \ - --prom-pgw localhost:9091 \ - --report-period 250 \ - --time ${{inputs.slo_workload_duration_seconds || 600}} \ - --read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \ - --write-rps ${{inputs.slo_workload_write_max_rps || 100}} \ - --read-timeout 1000 \ - --write-timeout 1000 + python ./tests/slo/src ${{ matrix.prefix }}-run ${{ matrix.run-args }} - if: always() name: Cleanup SLO Database run: | - python ./tests/slo/src cleanup grpc://localhost:2135 /Root/testdb + python ./tests/slo/src ${{ matrix.prefix }}-cleanup ${{ matrix.cleanup-args }} - if: always() name: Store ydb chaos testing logs diff --git a/tests/slo/README.md b/tests/slo/README.md index 486cbe99..0cb2070c 100644 --- a/tests/slo/README.md +++ b/tests/slo/README.md @@ -3,42 +3,72 @@ SLO is the type of test where app based on ydb-sdk is tested against falling YDB cluster nodes, tablets, network (that is possible situations for distributed DBs with hundreds of nodes) -### Implementations: +### Workload types: + +There are two workload types: + +- **Table SLO** - tests table operations (read/write) +- **Topic SLO** - tests topic operations (publish/consume) -There are two implementations: +### Implementations: - `sync` - `async` (now unimplemented) ### Usage: -It has 3 commands: +Each workload type has 3 commands: + +**Table commands:** +- `table-create` - creates table in database +- `table-cleanup` - drops table in database +- `table-run` - runs table workload (read and write to table with set RPS) -- `create` - creates table in database -- `cleanup` - drops table in database -- `run` - runs workload (read and write to table with sets RPS) +**Topic commands:** +- `topic-create` - creates topic with consumer in database +- `topic-cleanup` - drops topic in database +- `topic-run` - runs topic workload (publish and consume messages with set RPS) ### Run examples with all arguments: -create: -`python tests/slo/src/ create localhost:2136 /local -t tableName +**Table examples:** + +table-create: +`python tests/slo/src/ table-create localhost:2136 /local -t tableName --min-partitions-count 6 --max-partitions-count 1000 --partition-size 1 -с 1000 --write-timeout 10000` -cleanup: -`python tests/slo/src/ cleanup localhost:2136 /local -t tableName` +table-cleanup: +`python tests/slo/src/ table-cleanup localhost:2136 /local -t tableName` -run: -`python tests/slo/src/ run localhost:2136 /local -t tableName ---prom-pgw http://prometheus-pushgateway:9091 -report-period 250 +table-run: +`python tests/slo/src/ table-run localhost:2136 /local -t tableName +--prom-pgw http://prometheus-pushgateway:9091 --report-period 250 --read-rps 1000 --read-timeout 10000 --write-rps 100 --write-timeout 10000 --time 600 --shutdown-time 30` +**Topic examples:** + +topic-create: +`python tests/slo/src/ topic-create localhost:2136 /local +--topic-path /local/slo_topic --topic-consumer slo_consumer` + +topic-cleanup: +`python tests/slo/src/ topic-cleanup localhost:2136 /local --topic-path /local/slo_topic` + +topic-run: +`python tests/slo/src/ topic-run localhost:2136 /local +--topic-path /local/slo_topic --topic-consumer slo_consumer +--prom-pgw http://prometheus-pushgateway:9091 --report-period 250 +--topic-write-rps 50 --topic-read-rps 100 +--topic-write-timeout 5000 --topic-read-timeout 3000 +--time 600 --shutdown-time 30` + ## Arguments for commands: -### create -`python tests/slo/src/ create [options]` +### table-create +`python tests/slo/src/ table-create [options]` ``` Arguments: @@ -61,8 +91,8 @@ Options: ``` -### cleanup -`python tests/slo/src/ cleanup [options]` +### table-cleanup +`python tests/slo/src/ table-cleanup [options]` ``` Arguments: @@ -73,8 +103,8 @@ Options: -t --table-name table name to create ``` -### run -`python tests/slo/src/ run [options]` +### table-run +`python tests/slo/src/ table-run [options]` ``` Arguments: @@ -100,12 +130,70 @@ Options: --write-threads number of threads to use for read requests ``` +### topic-create +`python tests/slo/src/ topic-create [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + --topic-path topic path to create + --topic-consumer consumer name + --topic-min-partitions minimum active partitions + --topic-max-partitions maximum active partitions + --topic-retention-hours retention period in hours +``` + +### topic-cleanup +`python tests/slo/src/ topic-cleanup [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + --topic-path topic path to drop +``` + +### topic-run +`python tests/slo/src/ topic-run [options]` + +``` +Arguments: + endpoint YDB endpoint to connect to + db YDB database to connect to + +Options: + --topic-path topic path + --topic-consumer consumer name + + --prom-pgw prometheus push gateway + --report-period prometheus push period in milliseconds + + --topic-read-rps read RPS for topics + --topic-read-timeout read timeout milliseconds for topics + --topic-write-rps write RPS for topics + --topic-write-timeout write timeout milliseconds for topics + + --topic-message-size message size in bytes + --topic-read-threads number of threads to use for read requests + --topic-write-threads number of threads to use for write requests + + --time run time in seconds + --shutdown-time graceful shutdown time in seconds +``` + ## Authentication Workload using [auth-env](https://ydb.yandex-team.ru/docs/reference/ydb-sdk/recipes/auth-env) for authentication. ## What's inside -When running `run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. + +### Table workload +When running `table-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. - `readJob` reads rows from the table one by one with random identifiers generated by writeJob - `writeJob` generates and inserts rows @@ -120,6 +208,18 @@ Table have these fields: Primary key: `("object_hash", "object_id")` +### Topic workload +When running `topic-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. + +- `readJob` reads messages from topic using TopicReader and commits offsets +- `writeJob` generates and publishes messages to topic using TopicWriter +- `metricsJob` periodically sends metrics to Prometheus + +Messages contain: +- Sequential message ID +- Thread identifier +- Configurable payload size (padded with 'x' characters) + ## Collected metrics - `oks` - amount of OK requests - `not_oks` - amount of not OK requests @@ -127,6 +227,8 @@ Primary key: `("object_hash", "object_id")` - `latency` - summary of latencies in ms - `attempts` - summary of amount for request +Metrics are collected for both table operations (`read`, `write`) and topic operations (`read`, `write`). + > You must reset metrics to keep them `0` in prometheus and grafana before beginning and after ending of jobs ## Look at metrics in grafana diff --git a/tests/slo/playground/README.md b/tests/slo/playground/README.md new file mode 100644 index 00000000..eefb5cf0 --- /dev/null +++ b/tests/slo/playground/README.md @@ -0,0 +1,40 @@ +# SLO playground + +Playground may be used for testing SLO workloads locally + +It has several services: + +- `prometheus` - storage for metrics +- `prometheus-pushgateway` - push acceptor for prometheus +- `grafana` - provides chats for metrics +- `ydb` - local instance of ydb-database to run workload with + +## Network addresses + +- Grafana dashboard: http://localhost:3000 +- Prometheus pushgateway: http://localhost:9091 +- YDB monitoring: http://localhost:8765 +- YDB GRPC: grpc://localhost:2136 +- YDB GRPC TLS: grpcs://localhost:2135 + +## Start + +```shell +docker-compose up -d +``` + +## Stop + +```shell +docker-compose down +``` + +## Configs + +Grafana's dashboards stored in `configs/grafana/provisioning/dashboards` + +## Data + +YDB databases are not persistent + +All other data like metrics and certs stored in `data/` \ No newline at end of file diff --git a/tests/slo/playground/configs/chaos.sh b/tests/slo/playground/configs/chaos.sh new file mode 100755 index 00000000..550a6740 --- /dev/null +++ b/tests/slo/playground/configs/chaos.sh @@ -0,0 +1,52 @@ +#!/bin/sh -e + +get_random_container() { + # Get a list of all containers starting with ydb-database-* + containers=$(docker ps --format '{{.Names}}' | grep '^ydb-database-') + + # Convert the list to a newline-separated string + containers=$(echo "$containers" | tr ' ' '\n') + + # Count the number of containers + containersCount=$(echo "$containers" | wc -l) + + # Generate a random number between 0 and containersCount - 1 + randomIndex=$(shuf -i 0-$(($containersCount - 1)) -n 1) + + # Get the container name at the random index + nodeForChaos=$(echo "$containers" | sed -n "$(($randomIndex + 1))p") +} + + +sleep 20 + +echo "Start CHAOS YDB cluster!" + +for i in $(seq 1 1000) +do + echo "[$(date)]: docker stop/start iteration $i" + + get_random_container + + sh -c "docker stop ${nodeForChaos} -t 10" + sh -c "docker start ${nodeForChaos}" + + sleep 60 +done + +# for i in $(seq 1 3) +# do +# echo "[$(date)]: docker restart iteration $i" + +# get_random_container + +# sh -c "docker restart ${nodeForChaos} -t 0" + +# sleep 60 +# done + +# get_random_container + +# echo "[$(date)]: docker kill -s SIGKILL ${nodeForChaos}" + +# sh -c "docker kill -s SIGKILL ${nodeForChaos}" \ No newline at end of file diff --git a/tests/slo/playground/configs/compose.yaml b/tests/slo/playground/configs/compose.yaml new file mode 100644 index 00000000..eb09e406 --- /dev/null +++ b/tests/slo/playground/configs/compose.yaml @@ -0,0 +1,283 @@ +x-runtime: &runtime + hostname: localhost + platform: linux/amd64 + privileged: true + network_mode: host + +x-ydb-node: &ydb-node + image: cr.yandex/crptqonuodf51kdj7a7d/ydb:24.4.4.12 + restart: always + <<: *runtime + volumes: + - ./ydb.yaml:/opt/ydb/cfg/config.yaml + +name: ydb + +services: + static-0: + <<: *ydb-node + container_name: ydb-static-0 + command: + - /opt/ydb/bin/ydbd + - server + - --grpc-port + - "2135" + - --mon-port + - "8765" + - --ic-port + - "19001" + - --yaml-config + - /opt/ydb/cfg/config.yaml + - --node + - static + - --label + - deployment=docker + ports: + - 2135:2135 + - 8765:8765 + - 19001:19001 + healthcheck: + test: bash -c "exec 6<> /dev/tcp/localhost/2135" + interval: 10s + timeout: 1s + retries: 3 + start_period: 30s + + static-init: + <<: *ydb-node + restart: on-failure + container_name: ydb-static-init + command: + - /opt/ydb/bin/ydbd + - -s + - grpc://localhost:2135 + - admin + - blobstorage + - config + - init + - --yaml-file + - /opt/ydb/cfg/config.yaml + depends_on: + static-0: + condition: service_healthy + + tenant-init: + <<: *ydb-node + restart: on-failure + container_name: ydb-tenant-init + command: + - /opt/ydb/bin/ydbd + - -s + - grpc://localhost:2135 + - admin + - database + - /Root/testdb + - create + - ssd:1 + depends_on: + static-init: + condition: service_completed_successfully + + database-1: + <<: *ydb-node + container_name: ydb-database-1 + command: + - /opt/ydb/bin/ydbd + - server + - --grpc-port + - "2136" + - --mon-port + - "8766" + - --ic-port + - "19002" + - --yaml-config + - /opt/ydb/cfg/config.yaml + - --tenant + - /Root/testdb + - --node-broker + - grpc://localhost:2135 + - --label + - deployment=docker + ports: + - 2136:2136 + - 8766:8766 + - 19002:19002 + healthcheck: + test: bash -c "exec 6<> /dev/tcp/localhost/2136" + interval: 10s + timeout: 1s + retries: 3 + start_period: 30s + depends_on: + static-0: + condition: service_healthy + static-init: + condition: service_completed_successfully + tenant-init: + condition: service_completed_successfully + + # database-2: + # <<: *ydb-node + # container_name: ydb-database-2 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2137" + # - --mon-port + # - "8767" + # - --ic-port + # - "19003" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2137:2137 + # - 8767:8767 + # - 19003:19003 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2137" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + # database-3: + # <<: *ydb-node + # container_name: ydb-database-3 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2138" + # - --mon-port + # - "8768" + # - --ic-port + # - "19004" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2138:2138 + # - 8768:8768 + # - 19004:19004 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2138" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + # database-4: + # <<: *ydb-node + # container_name: ydb-database-4 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2139" + # - --mon-port + # - "8769" + # - --ic-port + # - "19005" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2139:2139 + # - 8769:8769 + # - 19005:19005 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2139" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + # database-5: + # <<: *ydb-node + # container_name: ydb-database-5 + # command: + # - /opt/ydb/bin/ydbd + # - server + # - --grpc-port + # - "2140" + # - --mon-port + # - "8770" + # - --ic-port + # - "19006" + # - --yaml-config + # - /opt/ydb/cfg/config.yaml + # - --tenant + # - /Root/testdb + # - --node-broker + # - grpc://localhost:2135 + # - --label + # - deployment=docker + # ports: + # - 2140:2140 + # - 8770:8770 + # - 19006:19006 + # healthcheck: + # test: bash -c "exec 6<> /dev/tcp/localhost/2140" + # interval: 10s + # timeout: 1s + # retries: 3 + # start_period: 30s + # depends_on: + # static-0: + # condition: service_healthy + # static-init: + # condition: service_completed_successfully + # tenant-init: + # condition: service_completed_successfully + + chaos: + image: docker:latest + restart: on-failure + container_name: ydb-chaos + <<: *runtime + entrypoint: ["/bin/sh", "-c", "chmod +x /opt/ydb/chaos.sh && ls -la /opt/ydb && /opt/ydb/chaos.sh"] + volumes: + - ./chaos.sh:/opt/ydb/chaos.sh + - ./ydb.yaml:/opt/ydb/cfg/config.yaml + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + static-0: + condition: service_healthy \ No newline at end of file diff --git a/tests/slo/playground/configs/ydb.yaml b/tests/slo/playground/configs/ydb.yaml new file mode 100644 index 00000000..eb27585d --- /dev/null +++ b/tests/slo/playground/configs/ydb.yaml @@ -0,0 +1,68 @@ +pqconfig: + require_credentials_in_new_protocol: false + +actor_system_config: + cpu_count: 1 + node_type: STORAGE + use_auto_config: true +blob_storage_config: + service_set: + groups: + - erasure_species: none + rings: + - fail_domains: + - vdisk_locations: + - node_id: 1 + path: SectorMap:1:64 + pdisk_category: SSD +# enable grpc server logs +#log_config: +# entry: +# - component: GRPC_SERVER +# level: 8 +channel_profile_config: + profile: + - channel: + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: ssd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: ssd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: ssd + profile_id: 0 +domains_config: + domain: + - name: Root + storage_pool_types: + - kind: ssd + pool_config: + box_id: 1 + erasure_species: none + kind: ssd + pdisk_filter: + - property: + - type: SSD + vdisk_kind: Default + state_storage: + - ring: + node: [ 1 ] + nto_select: 1 + ssid: 1 +host_configs: + - drive: + - path: SectorMap:1:64 + type: SSD + host_config_id: 1 +hosts: + - host: localhost + host_config_id: 1 + node_id: 1 + port: 19001 + walle_location: + body: 1 + data_center: az-1 + rack: "0" +static_erasure: none \ No newline at end of file diff --git a/tests/slo/slo_runner.sh b/tests/slo/slo_runner.sh new file mode 100755 index 00000000..d44729e7 --- /dev/null +++ b/tests/slo/slo_runner.sh @@ -0,0 +1,8 @@ +docker compose -f playground/configs/compose.yaml down -v +docker compose -f playground/configs/compose.yaml up -d --wait + +../../.venv/bin/python ./src topic-create grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic + +../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --read-threads 0 --time 10 + +../../.venv/bin/python ./src topic-run grpc://localhost:2135 /Root/testdb --path /Root/testdb/slo_topic --prom-pgw "" --write-threads 0 --read-rps 1 --debug --time 600 \ No newline at end of file diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index a93f59ba..dd1ae0b7 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -2,12 +2,14 @@ import logging from options import parse_options -from runner import run_from_args - -logging.basicConfig(level=logging.INFO) +from root_runner import run_from_args if __name__ == "__main__": args = parse_options() gc.disable() + + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)-8s %(message)s") + run_from_args(args) diff --git a/tests/slo/src/core/__init__.py b/tests/slo/src/core/__init__.py new file mode 100644 index 00000000..8729095b --- /dev/null +++ b/tests/slo/src/core/__init__.py @@ -0,0 +1 @@ +# Core utilities diff --git a/tests/slo/src/generator.py b/tests/slo/src/core/generator.py similarity index 100% rename from tests/slo/src/generator.py rename to tests/slo/src/core/generator.py diff --git a/tests/slo/src/metrics.py b/tests/slo/src/core/metrics.py similarity index 88% rename from tests/slo/src/metrics.py rename to tests/slo/src/core/metrics.py index 2433c730..8751eb2a 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -1,8 +1,10 @@ +from abc import ABC, abstractmethod + import time from contextlib import contextmanager from importlib.metadata import version from collections.abc import Iterable - +import logging from os import environ environ["PROMETHEUS_DISABLE_CREATED_SERIES"] = "True" @@ -16,7 +18,44 @@ WORKLOAD = environ.get("WORKLOAD", "sync-query") -class Metrics: +logger = logging.getLogger(__name__) + + +class BaseMetrics(ABC): + @abstractmethod + def start(self, labels): + pass + + @abstractmethod + def stop(self, labels, start_time, attempts=1, error=None): + pass + + @abstractmethod + def reset(self): + pass + + +def create_metrics(push_gateway) -> BaseMetrics: + if push_gateway: + logger.info("Creating metrics with push gateway: %s", push_gateway) + return Metrics(push_gateway) + else: + logger.info("Creating dummy metrics") + return DummyMetrics() + + +class DummyMetrics(BaseMetrics): + def start(self, labels): + return 0 + + def stop(self, labels, start_time, attempts=1, error=None): + pass + + def reset(self): + pass + + +class Metrics(BaseMetrics): def __init__(self, push_gateway): self._push_gtw = push_gateway self._registry = CollectorRegistry() diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py deleted file mode 100644 index c9bd7316..00000000 --- a/tests/slo/src/jobs.py +++ /dev/null @@ -1,337 +0,0 @@ -import ydb -import time -import logging -import dataclasses -from random import randint -from typing import Callable, Tuple, Union -from ratelimiter import RateLimiter - -import threading - -from metrics import Metrics, OP_TYPE_WRITE, OP_TYPE_READ - -from generator import RowGenerator - - -READ_QUERY_TEMPLATE = """ -DECLARE $object_id AS Uint64; -SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); -""" - -WRITE_QUERY_TEMPLATE = """ -DECLARE $object_id AS Uint64; -DECLARE $payload_str AS Utf8; -DECLARE $payload_double AS Double; -DECLARE $payload_timestamp AS Timestamp; - -UPSERT INTO `{}` ( - object_id, object_hash, payload_str, payload_double, payload_timestamp -) VALUES ( - $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp -); -""" - - -logger = logging.getLogger(__name__) - - -@dataclasses.dataclass -class RequestParams: - pool: Union[ydb.SessionPool, ydb.QuerySessionPool] - query: str - params: dict - metrics: Metrics - labels: Tuple[str] - request_settings: ydb.BaseRequestSettings - retry_settings: ydb.RetrySettings - check_result_cb: Callable = None - - -def execute_query(params: RequestParams): - attempt = 0 - error = None - - def transaction(session): - nonlocal attempt - attempt += 1 - - result = session.transaction().execute( - params.query, - parameters=params.params, - commit_tx=True, - settings=params.request_settings, - ) - if params.check_result_cb: - params.check_result_cb(result) - - return result - - ts = params.metrics.start(params.labels) - - try: - params.pool.retry_operation_sync(transaction, retry_settings=params.retry_settings) - except ydb.Error as err: - error = err - logger.exception("[labels: %s] Cannot retry error:", params.labels) - except BaseException as err: - error = err - logger.exception("[labels: %s] Unexpected error:", params.labels) - - params.metrics.stop(params.labels, ts, attempts=attempt, error=error) - - -def run_reads(driver, query, max_id, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start read workload over table service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.SessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - params = {"$object_id": randint(1, max_id)} - with limiter: - - def check_result(result): - assert result[0].rows[0] - - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_READ,), - request_settings=request_settings, - retry_settings=retry_setting, - check_result_cb=check_result, - ) - execute_query(params) - - logger.info("Stop read workload") - - -def run_read_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start read jobs over table service") - - session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) - read_q = session.prepare(READ_QUERY_TEMPLATE.format(tb_name)) - logger.info("Prepared write query") - - read_limiter = RateLimiter(max_calls=args.read_rps, period=1) - futures = [] - for _ in range(args.read_threads): - future = threading.Thread( - name="slo_run_read", - target=run_reads, - args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start read workload over query service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.QuerySessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - params = {"$object_id": (randint(1, max_id), ydb.PrimitiveType.Uint64)} - with limiter: - - def check_result(result): - with result: - pass - - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_READ,), - request_settings=request_settings, - retry_settings=retry_setting, - check_result_cb=check_result, - ) - execute_query(params) - - logger.info("Stop read workload") - - -def run_read_jobs_query(args, driver, tb_name, max_id, metrics): - logger.info("Start read jobs over query service") - - read_q = READ_QUERY_TEMPLATE.format(tb_name) - - read_limiter = RateLimiter(max_calls=args.read_rps, period=1) - futures = [] - for _ in range(args.read_threads): - future = threading.Thread( - name="slo_run_read_query", - target=run_reads_query, - args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start write workload over table service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.SessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - row = row_generator.get() - params = { - "$object_id": row.object_id, - "$payload_str": row.payload_str, - "$payload_double": row.payload_double, - "$payload_timestamp": row.payload_timestamp, - } - - with limiter: - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_WRITE,), - request_settings=request_settings, - retry_settings=retry_setting, - ) - execute_query(params) - - logger.info("Stop write workload") - - -def run_write_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start write jobs over table service") - - session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) - write_q = session.prepare(WRITE_QUERY_TEMPLATE.format(tb_name)) - logger.info("Prepared write query") - - write_limiter = RateLimiter(max_calls=args.write_rps, period=1) - row_generator = RowGenerator(max_id) - - futures = [] - for _ in range(args.write_threads): - future = threading.Thread( - name="slo_run_write", - target=run_writes, - args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout): - start_time = time.time() - - logger.info("Start write workload over query service") - - request_settings = ydb.BaseRequestSettings().with_timeout(timeout) - retry_setting = ydb.RetrySettings( - idempotent=True, - max_session_acquire_timeout=timeout, - ) - - with ydb.QuerySessionPool(driver) as pool: - logger.info("Session pool for read requests created") - - while time.time() - start_time < runtime: - row = row_generator.get() - params = { - "$object_id": (row.object_id, ydb.PrimitiveType.Uint64), - "$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8), - "$payload_double": (row.payload_double, ydb.PrimitiveType.Double), - "$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp), - } - - def check_result(result): - # we have to close stream by reading it till the end - with result: - pass - - with limiter: - params = RequestParams( - pool=pool, - query=query, - params=params, - metrics=metrics, - labels=(OP_TYPE_WRITE,), - request_settings=request_settings, - retry_settings=retry_setting, - check_result_cb=check_result, - ) - execute_query(params) - - logger.info("Stop write workload") - - -def run_write_jobs_query(args, driver, tb_name, max_id, metrics): - logger.info("Start write jobs for query service") - - write_q = WRITE_QUERY_TEMPLATE.format(tb_name) - - write_limiter = RateLimiter(max_calls=args.write_rps, period=1) - row_generator = RowGenerator(max_id) - - futures = [] - for _ in range(args.write_threads): - future = threading.Thread( - name="slo_run_write_query", - target=run_writes_query, - args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000), - ) - future.start() - futures.append(future) - return futures - - -def push_metric(limiter, runtime, metrics): - start_time = time.time() - logger.info("Start push metrics") - - while time.time() - start_time < runtime: - with limiter: - metrics.push() - - logger.info("Stop push metrics") - - -def run_metric_job(args, metrics): - limiter = RateLimiter(max_calls=10**6 // args.report_period, period=1) - future = threading.Thread( - name="slo_run_metrics", - target=push_metric, - args=(limiter, args.time, metrics), - ) - future.start() - return future diff --git a/tests/slo/src/jobs/__init__.py b/tests/slo/src/jobs/__init__.py new file mode 100644 index 00000000..fecdf448 --- /dev/null +++ b/tests/slo/src/jobs/__init__.py @@ -0,0 +1 @@ +# Job modules diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py new file mode 100644 index 00000000..6cef5e14 --- /dev/null +++ b/tests/slo/src/jobs/base.py @@ -0,0 +1,44 @@ +from abc import ABC, abstractmethod +import logging +import threading +import time +from ratelimiter import RateLimiter + +import ydb + +logger = logging.getLogger(__name__) + + +class BaseJobManager(ABC): + def __init__(self, driver, args, metrics): + self.driver: ydb.Driver = driver + self.args = args + self.metrics = metrics + + @abstractmethod + def run_tests(self): + pass + + def _run_metric_job(self): + if not self.args.prom_pgw: + return [] + + limiter = RateLimiter(max_calls=10**6 // self.args.report_period, period=1) + + future = threading.Thread( + name="slo_metrics_sender", + target=self._metric_sender, + args=(limiter, self.args.time), + ) + future.start() + return [future] + + def _metric_sender(self, limiter, runtime): + start_time = time.time() + logger.info("Start push metrics") + + while time.time() - start_time < runtime: + with limiter: + self.metrics.push() + + logger.info("Stop push metrics") diff --git a/tests/slo/src/jobs/table_jobs.py b/tests/slo/src/jobs/table_jobs.py new file mode 100644 index 00000000..7947ba74 --- /dev/null +++ b/tests/slo/src/jobs/table_jobs.py @@ -0,0 +1,325 @@ +import ydb +import time +import logging +import threading +from random import randint +from ratelimiter import RateLimiter + +from .base import BaseJobManager +from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE +from core.generator import RowGenerator + +logger = logging.getLogger(__name__) + +READ_QUERY_TEMPLATE = """ +DECLARE $object_id AS Uint64; +SELECT * FROM `{}` WHERE object_id = $object_id AND object_hash = Digest::NumericHash($object_id); +""" + +WRITE_QUERY_TEMPLATE = """ +DECLARE $object_id AS Uint64; +DECLARE $payload_str AS Utf8; +DECLARE $payload_double AS Double; +DECLARE $payload_timestamp AS Timestamp; + +UPSERT INTO `{}` ( + object_id, object_hash, payload_str, payload_double, payload_timestamp +) VALUES ( + $object_id, Digest::NumericHash($object_id), $payload_str, $payload_double, $payload_timestamp +); +""" + + +class RequestParams: + def __init__(self, pool, query, params, metrics, labels, request_settings, retry_settings, check_result_cb=None): + self.pool = pool + self.query = query + self.params = params + self.metrics = metrics + self.labels = labels + self.request_settings = request_settings + self.retry_settings = retry_settings + self.check_result_cb = check_result_cb + + +def execute_query(params: RequestParams): + attempt = 0 + error = None + + def transaction(session): + nonlocal attempt + attempt += 1 + + result = session.transaction().execute( + params.query, + parameters=params.params, + commit_tx=True, + settings=params.request_settings, + ) + if params.check_result_cb: + params.check_result_cb(result) + + return result + + ts = params.metrics.start(params.labels) + + try: + params.pool.retry_operation_sync(transaction, retry_settings=params.retry_settings) + except ydb.Error as err: + error = err + logger.exception("[labels: %s] Cannot retry error:", params.labels) + except BaseException as err: + error = err + logger.exception("[labels: %s] Unexpected error:", params.labels) + + params.metrics.stop(params.labels, ts, attempts=attempt, error=error) + + +class TableJobManager(BaseJobManager): + def __init__(self, driver, args, metrics, table_name, max_id): + super().__init__(driver, args, metrics) + self.table_name = table_name + self.max_id = max_id + + from core.metrics import WORKLOAD + + self.workload_type = WORKLOAD + + def run_tests(self): + if self.workload_type == "sync-table": + futures = [ + *self._run_table_read_jobs(), + *self._run_table_write_jobs(), + *self._run_metric_job(), + ] + elif self.workload_type == "sync-query": + futures = [ + *self._run_query_read_jobs(), + *self._run_query_write_jobs(), + *self._run_metric_job(), + ] + else: + raise ValueError(f"Unsupported workload type: {self.workload_type}") + + for future in futures: + future.join() + + def _run_table_read_jobs(self): + logger.info("Start table read jobs") + + session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create()) + read_query = session.prepare(READ_QUERY_TEMPLATE.format(self.table_name)) + + read_limiter = RateLimiter(max_calls=self.args.read_rps, period=1) + + futures = [] + for i in range(self.args.read_threads): + future = threading.Thread( + name=f"slo_table_read_{i}", + target=self._run_table_reads, + args=(read_query, read_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_table_write_jobs(self): + logger.info("Start table write jobs") + + session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create()) + write_query = session.prepare(WRITE_QUERY_TEMPLATE.format(self.table_name)) + + write_limiter = RateLimiter(max_calls=self.args.write_rps, period=1) + + futures = [] + for i in range(self.args.write_threads): + future = threading.Thread( + name=f"slo_table_write_{i}", + target=self._run_table_writes, + args=(write_query, write_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_query_read_jobs(self): + logger.info("Start query read jobs") + + read_query = READ_QUERY_TEMPLATE.format(self.table_name) + read_limiter = RateLimiter(max_calls=self.args.read_rps, period=1) + + futures = [] + for i in range(self.args.read_threads): + future = threading.Thread( + name=f"slo_query_read_{i}", + target=self._run_query_reads, + args=(read_query, read_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_query_write_jobs(self): + logger.info("Start query write jobs") + + write_query = WRITE_QUERY_TEMPLATE.format(self.table_name) + write_limiter = RateLimiter(max_calls=self.args.write_rps, period=1) + + futures = [] + for i in range(self.args.write_threads): + future = threading.Thread( + name=f"slo_query_write_{i}", + target=self._run_query_writes, + args=(write_query, write_limiter), + ) + future.start() + futures.append(future) + + return futures + + def _run_table_reads(self, query, limiter): + start_time = time.time() + logger.info("Start table read workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.read_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.read_timeout / 1000, + ) + + with ydb.SessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + params = {"$object_id": randint(1, self.max_id)} + + with limiter: + + def check_result(result): + assert result[0].rows[0] + + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_READ,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(request_params) + + logger.info("Stop table read workload") + + def _run_table_writes(self, query, limiter): + start_time = time.time() + logger.info("Start table write workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.write_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.write_timeout / 1000, + ) + + row_generator = RowGenerator(self.max_id) + + with ydb.SessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + row = row_generator.get() + params = { + "$object_id": row.object_id, + "$payload_str": row.payload_str, + "$payload_double": row.payload_double, + "$payload_timestamp": row.payload_timestamp, + } + + with limiter: + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_WRITE,), + request_settings=request_settings, + retry_settings=retry_setting, + ) + execute_query(request_params) + + logger.info("Stop table write workload") + + def _run_query_reads(self, query, limiter): + start_time = time.time() + logger.info("Start query read workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.read_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.read_timeout / 1000, + ) + + with ydb.QuerySessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + params = {"$object_id": (randint(1, self.max_id), ydb.PrimitiveType.Uint64)} + + with limiter: + + def check_result(result): + with result: + pass + + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_READ,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(request_params) + + logger.info("Stop query read workload") + + def _run_query_writes(self, query, limiter): + start_time = time.time() + logger.info("Start query write workload") + + request_settings = ydb.BaseRequestSettings().with_timeout(self.args.write_timeout / 1000) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=self.args.write_timeout / 1000, + ) + + row_generator = RowGenerator(self.max_id) + + with ydb.QuerySessionPool(self.driver) as pool: + while time.time() - start_time < self.args.time: + row = row_generator.get() + params = { + "$object_id": (row.object_id, ydb.PrimitiveType.Uint64), + "$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8), + "$payload_double": (row.payload_double, ydb.PrimitiveType.Double), + "$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp), + } + + def check_result(result): + with result: + pass + + with limiter: + request_params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=self.metrics, + labels=(OP_TYPE_WRITE,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(request_params) + + logger.info("Stop query write workload") diff --git a/tests/slo/src/jobs/topic_jobs.py b/tests/slo/src/jobs/topic_jobs.py new file mode 100644 index 00000000..3d7af62d --- /dev/null +++ b/tests/slo/src/jobs/topic_jobs.py @@ -0,0 +1,119 @@ +import ydb +import time +import logging +import threading +from ratelimiter import RateLimiter + +from .base import BaseJobManager +from core.metrics import OP_TYPE_READ, OP_TYPE_WRITE + +logger = logging.getLogger(__name__) + + +class TopicJobManager(BaseJobManager): + def run_tests(self): + futures = [ + *self._run_topic_write_jobs(), + *self._run_topic_read_jobs(), + *self._run_metric_job(), + ] + + for future in futures: + future.join() + + def _run_topic_write_jobs(self): + logger.info("Start topic write jobs") + + write_limiter = RateLimiter(max_calls=self.args.write_rps, period=1) + + futures = [] + for i in range(self.args.write_threads): + future = threading.Thread( + name=f"slo_topic_write_{i}", + target=self._run_topic_writes, + args=( + write_limiter, + i, + ), + ) + future.start() + futures.append(future) + + return futures + + def _run_topic_read_jobs(self): + logger.info("Start topic read jobs") + + read_limiter = RateLimiter(max_calls=self.args.read_rps, period=1) + + futures = [] + for i in range(self.args.read_threads): + future = threading.Thread( + name=f"slo_topic_read_{i}", + target=self._run_topic_reads, + args=(read_limiter,), + ) + future.start() + futures.append(future) + + return futures + + def _run_topic_writes(self, limiter, partition_id=None): + start_time = time.time() + logger.info("Start topic write workload") + + with self.driver.topic_client.writer( + self.args.path, + codec=ydb.TopicCodec.GZIP, + partition_id=partition_id, + ) as writer: + logger.info("Topic writer created") + + message_count = 0 + while time.time() - start_time < self.args.time: + with limiter: + message_count += 1 + + content = f"message_{message_count}_{threading.current_thread().name}".encode("utf-8") + + if len(content) < self.args.message_size: + content += b"x" * (self.args.message_size - len(content)) + + message = ydb.TopicWriterMessage(data=content) + + ts = self.metrics.start((OP_TYPE_WRITE,)) + try: + writer.write_with_ack(message) + logger.info("Write message: %s", content) + self.metrics.stop((OP_TYPE_WRITE,), ts) + except Exception as e: + self.metrics.stop((OP_TYPE_WRITE,), ts, error=e) + logger.error("Write error: %s", e) + + logger.info("Stop topic write workload") + + def _run_topic_reads(self, limiter): + start_time = time.time() + logger.info("Start topic read workload") + + with self.driver.topic_client.reader( + self.args.path, + self.args.consumer, + ) as reader: + logger.info("Topic reader created") + + while time.time() - start_time < self.args.time: + with limiter: + ts = self.metrics.start((OP_TYPE_READ,)) + try: + msg = reader.receive_message() + if msg is not None: + logger.info("Read message: %s", msg.data.decode()) + reader.commit_with_ack(msg) + + self.metrics.stop((OP_TYPE_READ,), ts) + except Exception as e: + self.metrics.stop((OP_TYPE_READ,), ts, error=e) + logger.error("Read error: %s", e) + + logger.info("Stop topic read workload") diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 6c24422d..a634bc89 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -5,54 +5,100 @@ def add_common_options(parser): parser.add_argument("endpoint", help="YDB endpoint") parser.add_argument("db", help="YDB database name") parser.add_argument("-t", "--table-name", default="key_value", help="Table name") + parser.add_argument("--debug", action="store_true", help="Enable debug logging") -def make_create_parser(subparsers): - create_parser = subparsers.add_parser("create", help="Create tables and fill with initial content") - add_common_options(create_parser) +def make_table_create_parser(subparsers): + table_create_parser = subparsers.add_parser("table-create", help="Create tables and fill with initial content") + add_common_options(table_create_parser) - create_parser.add_argument( + table_create_parser.add_argument( "-p-min", "--min-partitions-count", default=6, type=int, help="Minimum amount of partitions in table" ) - create_parser.add_argument( + table_create_parser.add_argument( "-p-max", "--max-partitions-count", default=1000, type=int, help="Maximum amount of partitions in table" ) - create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") - create_parser.add_argument( + table_create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") + table_create_parser.add_argument( "-c", "--initial-data-count", default=1000, type=int, help="Total number of records to generate" ) - create_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") + table_create_parser.add_argument( + "--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]" + ) - create_parser.add_argument( + table_create_parser.add_argument( "--batch-size", default=100, type=int, help="Number of new records in each create request" ) - create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") + table_create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") + + +def make_table_run_parser(subparsers): + table_run_parser = subparsers.add_parser("table-run", help="Run table SLO workload") + add_common_options(table_run_parser) + + table_run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") + table_run_parser.add_argument( + "--read-timeout", default=10000, type=int, help="Read requests execution timeout [ms]" + ) + + table_run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") + table_run_parser.add_argument( + "--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]" + ) + + table_run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") + table_run_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") + table_run_parser.add_argument("--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway") + table_run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") -def make_run_parser(subparsers, name="run"): - run_parser = subparsers.add_parser(name, help="Run measurable workload") - add_common_options(run_parser) + table_run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") + table_run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") - run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") - run_parser.add_argument("--read-timeout", default=10000, type=int, help="Read requests execution timeout [ms]") - run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") - run_parser.add_argument("--write-timeout", default=20000, type=int, help="Write requests execution timeout [ms]") +def make_table_cleanup_parser(subparsers): + table_cleanup_parser = subparsers.add_parser("table-cleanup", help="Drop tables") + add_common_options(table_cleanup_parser) - run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - run_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") - run_parser.add_argument("--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway") - run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") +def make_topic_create_parser(subparsers): + topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") + add_common_options(topic_create_parser) - run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") - run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") + topic_create_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") + topic_create_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") + topic_create_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") -def make_cleanup_parser(subparsers): - cleanup_parser = subparsers.add_parser("cleanup", help="Drop tables") - add_common_options(cleanup_parser) +def make_topic_run_parser(subparsers): + topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload") + add_common_options(topic_parser) + + topic_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") + topic_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") + topic_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") + topic_parser.add_argument("--read-rps", default=100, type=int, help="Topic read request rps") + topic_parser.add_argument("--read-timeout", default=5000, type=int, help="Topic read timeout [ms]") + topic_parser.add_argument("--write-rps", default=100, type=int, help="Topic write request rps") + topic_parser.add_argument("--write-timeout", default=5000, type=int, help="Topic write timeout [ms]") + topic_parser.add_argument("--read-threads", default=1, type=int, help="Number of threads for topic reading") + topic_parser.add_argument("--write-threads", default=1, type=int, help="Number of threads for topic writing") + topic_parser.add_argument("--message-size", default=100, type=int, help="Topic message size in bytes") + + topic_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") + topic_parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time in seconds") + topic_parser.add_argument( + "--prom-pgw", default="localhost:9091", type=str, help="Prometheus push gateway (empty to disable)" + ) + topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") + + +def make_topic_cleanup_parser(subparsers): + topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic") + add_common_options(topic_cleanup_parser) + + topic_cleanup_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") def get_root_parser(): @@ -67,9 +113,13 @@ def get_root_parser(): help="List of subcommands", ) - make_create_parser(subparsers) - make_run_parser(subparsers) - make_cleanup_parser(subparsers) + make_table_create_parser(subparsers) + make_table_run_parser(subparsers) + make_table_cleanup_parser(subparsers) + + make_topic_create_parser(subparsers) + make_topic_run_parser(subparsers) + make_topic_cleanup_parser(subparsers) return parser diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py new file mode 100644 index 00000000..3bf8a8a0 --- /dev/null +++ b/tests/slo/src/root_runner.py @@ -0,0 +1,63 @@ +import ydb +import logging +from typing import Dict + +from runners.topic_runner import TopicRunner +from runners.table_runner import TableRunner +from runners.base import BaseRunner + +logger = logging.getLogger(__name__) + + +class SLORunner: + def __init__(self): + self.runners: Dict[str, type(BaseRunner)] = {} + + def register_runner(self, prefix: str, runner_cls: type(BaseRunner)): + self.runners[prefix] = runner_cls + + def run_command(self, args): + subcommand_parts = args.subcommand.split("-", 1) + if len(subcommand_parts) < 2: + raise ValueError(f"Invalid subcommand format: {args.subcommand}. Expected 'prefix-command'") + + prefix, command = subcommand_parts + if prefix not in self.runners: + raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}") + + runner_instance = self.runners[prefix]() + driver_config = ydb.DriverConfig( + args.endpoint, + database=args.db, + grpc_keep_alive_timeout=5000, + ) + + with ydb.Driver(driver_config) as driver: + driver.wait(timeout=300) + try: + runner_instance.set_driver(driver) + if command == "create": + runner_instance.create(args) + elif command == "run": + runner_instance.run(args) + elif command == "cleanup": + runner_instance.cleanup(args) + else: + raise RuntimeError(f"Unknown command {command} for prefix {prefix}") + except BaseException: + logger.exception("Something went wrong") + raise + finally: + driver.stop(timeout=getattr(args, "shutdown_time", 10)) + + +def create_runner() -> SLORunner: + runner = SLORunner() + runner.register_runner("table", TableRunner) + runner.register_runner("topic", TopicRunner) + return runner + + +def run_from_args(args): + runner = create_runner() + runner.run_command(args) diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py deleted file mode 100644 index a636309e..00000000 --- a/tests/slo/src/runner.py +++ /dev/null @@ -1,144 +0,0 @@ -import ydb -import logging - -from os import path -from generator import batch_generator - -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor - -from jobs import ( - run_read_jobs, - run_write_jobs, - run_read_jobs_query, - run_write_jobs_query, - run_metric_job, -) -from metrics import Metrics, WORKLOAD - -logger = logging.getLogger(__name__) - - -INSERT_ROWS_TEMPLATE = """ -DECLARE $items AS List>; -UPSERT INTO `{}` -SELECT Digest::NumericHash(object_id) AS object_hash, object_id, payload_str, payload_double, payload_timestamp -FROM AS_TABLE($items); -""" - - -def insert_rows(pool, prepared, data, timeout): - def transaction(session: ydb.table.Session): - session.transaction().execute( - prepared, - {"$items": data}, - commit_tx=True, - settings=ydb.BaseRequestSettings().with_timeout(timeout), - ) - - pool.retry_operation_sync(transaction) - logger.info("Insert %s rows", len(data)) - - -def run_create(args, driver, tb_name): - timeout = args.write_timeout / 1000 - - def create_table(session): - session.create_table( - tb_name, - ydb.TableDescription() - .with_column(ydb.Column("object_hash", ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) - .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) - .with_primary_keys("object_hash", "object_id") - .with_uniform_partitions(args.min_partitions_count) - .with_partitioning_settings( - ydb.PartitioningSettings() - .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) - .with_min_partitions_count(args.min_partitions_count) - .with_max_partitions_count(args.max_partitions_count) - .with_partition_size_mb(args.partition_size) - ), - settings=ydb.BaseRequestSettings().with_timeout(timeout), - ) - - return session.prepare(INSERT_ROWS_TEMPLATE.format(tb_name)) - - with ydb.SessionPool(driver) as pool: - prepared = pool.retry_operation_sync(create_table) - - futures = set() - with ThreadPoolExecutor(max_workers=args.threads, thread_name_prefix="slo_create") as executor: - for batch in batch_generator(args): - futures.add(executor.submit(insert_rows, pool, prepared, batch, timeout)) - for f in concurrent.futures.as_completed(futures): - f.result() - - -def run_slo(args, driver, tb_name): - session = driver.table_client.session().create() - result = session.transaction().execute( - "SELECT MAX(`object_id`) as max_id FROM `{}`".format(tb_name), - commit_tx=True, - ) - max_id = result[0].rows[0]["max_id"] - logger.info("Max ID: %s", max_id) - - metrics = Metrics(args.prom_pgw) - if WORKLOAD == "sync-table": - futures = ( - *run_read_jobs(args, driver, tb_name, max_id, metrics), - *run_write_jobs(args, driver, tb_name, max_id, metrics), - run_metric_job(args, metrics), - ) - elif WORKLOAD == "sync-query": - futures = ( - *run_read_jobs_query(args, driver, tb_name, max_id, metrics), - *run_write_jobs_query(args, driver, tb_name, max_id, metrics), - run_metric_job(args, metrics), - ) - else: - raise ValueError(f"Unsupported service: {WORKLOAD}") - - for future in futures: - future.join() - - metrics.reset() - - -def run_cleanup(args, driver, tb_name): - session = driver.table_client.session().create() - session.drop_table(tb_name) - - -def run_from_args(args): - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, - ) - - table_name = path.join(args.db, args.table_name) - - with ydb.Driver(driver_config) as driver: - driver.wait(timeout=300) - try: - if args.subcommand == "create": - run_create(args, driver, table_name) - elif args.subcommand == "run": - run_slo(args, driver, table_name) - elif args.subcommand == "cleanup": - run_cleanup(args, driver, table_name) - else: - raise RuntimeError(f"Unknown command {args.subcommand}") - except BaseException: - logger.exception("Something went wrong") - raise - finally: - driver.stop(timeout=getattr(args, "shutdown_time", 10)) diff --git a/tests/slo/src/runners/__init__.py b/tests/slo/src/runners/__init__.py new file mode 100644 index 00000000..3f288356 --- /dev/null +++ b/tests/slo/src/runners/__init__.py @@ -0,0 +1 @@ +# Runner modules diff --git a/tests/slo/src/runners/base.py b/tests/slo/src/runners/base.py new file mode 100644 index 00000000..1f9eda2e --- /dev/null +++ b/tests/slo/src/runners/base.py @@ -0,0 +1,29 @@ +import ydb +import logging +from abc import ABC, abstractmethod + + +class BaseRunner(ABC): + def __init__(self): + self.logger = logging.getLogger(self.__class__.__module__) + self.driver = None + + @property + @abstractmethod + def prefix(self) -> str: + pass + + def set_driver(self, driver: ydb.Driver): + self.driver = driver + + @abstractmethod + def create(self, args): + pass + + @abstractmethod + def run(self, args): + pass + + @abstractmethod + def cleanup(self, args): + pass diff --git a/tests/slo/src/runners/table_runner.py b/tests/slo/src/runners/table_runner.py new file mode 100644 index 00000000..797ae02e --- /dev/null +++ b/tests/slo/src/runners/table_runner.py @@ -0,0 +1,113 @@ +import ydb +from os import path +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor + +from .base import BaseRunner +from jobs.table_jobs import TableJobManager +from core.metrics import create_metrics +from core.generator import batch_generator + + +class TableRunner(BaseRunner): + @property + def prefix(self) -> str: + return "table" + + def create(self, args): + table_name = path.join(args.db, args.table_name) + timeout = args.write_timeout / 1000 + + self.logger.info("Creating table: %s", table_name) + + INSERT_ROWS_TEMPLATE = """ + DECLARE $items AS List>; + UPSERT INTO `{}` + SELECT Digest::NumericHash(object_id) AS object_hash, object_id, payload_str, payload_double, payload_timestamp + FROM AS_TABLE($items); + """ + + def create_table(session): + session.create_table( + table_name, + ydb.TableDescription() + .with_column(ydb.Column("object_hash", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("object_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("payload_str", ydb.OptionalType(ydb.PrimitiveType.Utf8))) + .with_column(ydb.Column("payload_double", ydb.OptionalType(ydb.PrimitiveType.Double))) + .with_column(ydb.Column("payload_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) + .with_primary_keys("object_hash", "object_id") + .with_uniform_partitions(args.min_partitions_count) + .with_partitioning_settings( + ydb.PartitioningSettings() + .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) + .with_min_partitions_count(args.min_partitions_count) + .with_max_partitions_count(args.max_partitions_count) + .with_partition_size_mb(args.partition_size) + ), + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + + return session.prepare(INSERT_ROWS_TEMPLATE.format(table_name)) + + def insert_rows(pool, prepared, data, timeout): + def transaction(session: ydb.table.Session): + session.transaction().execute( + prepared, + {"$items": data}, + commit_tx=True, + settings=ydb.BaseRequestSettings().with_timeout(timeout), + ) + + pool.retry_operation_sync(transaction) + self.logger.info("Insert %s rows", len(data)) + + with ydb.SessionPool(self.driver) as pool: + prepared = pool.retry_operation_sync(create_table) + self.logger.info("Table created: %s", table_name) + + self.logger.info("Filling table with initial data") + futures = set() + with ThreadPoolExecutor(max_workers=args.threads, thread_name_prefix="slo_create") as executor: + for batch in batch_generator(args): + futures.add(executor.submit(insert_rows, pool, prepared, batch, timeout)) + for f in concurrent.futures.as_completed(futures): + f.result() + + self.logger.info("Table creation completed") + + def run(self, args): + metrics = create_metrics(args.prom_pgw) + + self.logger.info("Starting table SLO tests") + + table_name = path.join(args.db, args.table_name) + + session = self.driver.table_client.session().create() + result = session.transaction().execute( + "SELECT MAX(`object_id`) as max_id FROM `{}`".format(table_name), + commit_tx=True, + ) + max_id = result[0].rows[0]["max_id"] + self.logger.info("Max ID: %s", max_id) + + job_manager = TableJobManager(self.driver, args, metrics, table_name, max_id) + job_manager.run_tests() + + self.logger.info("Table SLO tests completed") + + if hasattr(metrics, "reset"): + metrics.reset() + + def cleanup(self, args): + table_name = path.join(args.db, args.table_name) + self.logger.info("Cleaning up table: %s", table_name) + + session = self.driver.table_client.session().create() + session.drop_table(table_name) + + self.logger.info("Table dropped: %s", table_name) diff --git a/tests/slo/src/runners/topic_runner.py b/tests/slo/src/runners/topic_runner.py new file mode 100644 index 00000000..7a9be942 --- /dev/null +++ b/tests/slo/src/runners/topic_runner.py @@ -0,0 +1,90 @@ +import time +import ydb + +from .base import BaseRunner +from jobs.topic_jobs import TopicJobManager +from core.metrics import create_metrics + + +class TopicRunner(BaseRunner): + @property + def prefix(self) -> str: + return "topic" + + def create(self, args): + retry_no = 0 + while retry_no < 3: + self.logger.info("Creating topic: %s (retry no: %d)", args.path, retry_no) + + try: + self.driver.topic_client.create_topic( + path=args.path, + min_active_partitions=args.partitions_count, + max_active_partitions=args.partitions_count, + consumers=[args.consumer], + ) + + self.logger.info("Topic created successfully: %s", args.path) + self.logger.info("Consumer created: %s", args.consumer) + return + + except ydb.Error as e: + error_msg = str(e).lower() + if "already exists" in error_msg: + self.logger.info("Topic already exists: %s", args.path) + + try: + description = self.driver.topic_client.describe_topic(args.path) + consumer_exists = any(c.name == args.consumer for c in description.consumers) + + if not consumer_exists: + self.logger.info("Adding consumer %s to existing topic", args.consumer) + self.driver.topic_client.alter_topic(path=args.path, add_consumers=[args.consumer]) + self.logger.info("Consumer added successfully: %s", args.consumer) + return + else: + self.logger.info("Consumer already exists: %s", args.consumer) + return + + except Exception as alter_err: + self.logger.warning("Failed to add consumer: %s", alter_err) + raise + elif "storage pool" in error_msg or "pq" in error_msg: + self.logger.error("YDB instance does not support topics (PersistentQueues): %s", e) + self.logger.error("Please use YDB instance with topic support") + raise + elif isinstance(e, ydb.Unavailable): + self.logger.info("YDB instance is not ready, retrying in 5 seconds...") + time.sleep(5) + retry_no += 1 + else: + self.logger.error("Failed to create topic: %s", e) + raise + + raise RuntimeError("Failed to create topic") + + def run(self, args): + metrics = create_metrics(args.prom_pgw) + + self.logger.info("Starting topic SLO tests") + + job_manager = TopicJobManager(self.driver, args, metrics) + job_manager.run_tests() + + self.logger.info("Topic SLO tests completed") + + if hasattr(metrics, "reset"): + metrics.reset() + + def cleanup(self, args): + self.logger.info("Cleaning up topic: %s", args.path) + + try: + self.driver.topic_client.drop_topic(args.path) + self.logger.info("Topic dropped: %s", args.path) + except ydb.Error as e: + if "does not exist" in str(e).lower(): + self.logger.info("Topic does not exist: %s", args.path) + else: + self.logger.error("Failed to drop topic: %s", e) + raise