diff --git a/.gitignore b/.gitignore index 0e42b57..1160218 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,9 @@ pypriskv/priskv.egg-info/ pypriskv/priskv/*.so include/priskv-version.h pypriskv/priskv/__pycache__/ +pypriskv/**/__pycache__/ +**/__pycache__/ +*.pyc man/priskv-server.1 .vscode/ tmp/ diff --git a/SETUP_UCX_TCP.md b/SETUP_UCX_TCP.md new file mode 100644 index 0000000..aeac072 --- /dev/null +++ b/SETUP_UCX_TCP.md @@ -0,0 +1,193 @@ +# PrisKV: From Zero to Working (UCX TCP + Python client) + +This guide targets developer environments without RDMA hardware and uses UCX over TCP for connectivity. +The goal is: +1. Verify `import priskv` works (Python client is installed) +2. Connect using `PriskvClient` to a running `priskv-server` +3. Optionally validate end-to-end `set/get` + +--- + +## 0. Prerequisites (Debian/Ubuntu) + +### System dependencies + +```bash +apt-get update +apt-get install -y \ + git gcc make cmake \ + librdmacm-dev rdma-core libibverbs-dev \ + libncurses5-dev libmount-dev libevent-dev libssl-dev \ + dpkg-dev debhelper \ + pkg-config \ + python3-pybind11 python3-dev python3-pip \ + libonig-dev libhiredis-dev liburing-dev \ + libucx-dev libucx0 +``` + +### Python build tooling + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip3 install pybind11 yapf==0.32.0 +``` + +If you are running as `root` inside a container, `sudo` is usually not required. + +--- + +## 1. Prepare the source tree + +```bash +cd /PrisKV +``` + +If the repository uses submodules (e.g., `json-c`), initialize them: + +```bash +git submodule update --init --recursive +``` + +--- + +## 2. Build `priskv-server` (C/C++) + +Recommended: build everything once, then rebuild server if needed: + +```bash +cd /PrisKV +make +``` + +Or build only the server (preferred if build without RDMA): + +```bash +cd /PrisKV +make server +``` + +--- + +## 3. Build and install the Python client (`pypriskv`) + +If you use a virtual environment (recommended), activate it first: + +```bash +source .venv/bin/activate +pip install -U pip setuptools wheel +``` + +### Option A: Editable install (recommended for development) + +```bash +cd /PrisKV +make all +cd pypriskv +pip install --no-build-isolation -v -e . +``` + +### Option B: Install from wheel + +```bash +cd /PrisKV +make all +cd pypriskv +python3 setup.py build_ext bdist_wheel +pip install ./dist/*.whl +``` + +--- + +## 4. Configure runtime environment variables for UCX TCP + +Make sure both server and client use the same transport configuration: + +- `PRISKV_TRANSPORT=ucx` +- `UCX_TLS=tcp` +- If using direct mode (no Redis meta service): `PRISKV_CLIENT_DIRECT_MODE=y` + +Example: + +```bash +export PRISKV_TRANSPORT=ucx +export UCX_TLS=tcp +export PRISKV_CLIENT_DIRECT_MODE=y +``` + +Optional debugging: + +```bash +export PRISKV_LOG_LEVEL=debug +``` + +--- + +## 5. Start `priskv-server` (UCX TCP) + +```bash +source .venv/bin/activate +cd /PrisKV +export PRISKV_TRANSPORT=ucx +export UCX_TLS=tcp +export PRISKV_CLIENT_DIRECT_MODE=y +export PRISKV_USE_SHM=n + +./server/priskv-server -a 127.0.0.1 -p 6379 --acl any +``` + +Expected server log includes: +- `UCX: <...> ready` + +--- + +## 6. Verify connectivity from Python + +Activate your venv: + +```bash +source .venv/bin/activate +``` + +Then run a minimal connectivity check: + +```bash +export PRISKV_TRANSPORT=ucx +export UCX_TLS=tcp +export PRISKV_CLIENT_DIRECT_MODE=y + +python - <<'PY' +import priskv + +c = priskv.PriskvClient("127.0.0.1", 6379, "kvcache-redis") +print("connected") +c.close() +print("closed") +PY +``` + +Optional: validate `set/get`: + +```bash +python - <<'PY' +import priskv + +c = priskv.PriskvClient("127.0.0.1", 6379, "kvcache-redis") +print("setstr:", c.setstr("k1", "v1", 2000)) +print("getstr:", c.getstr("k1")) +c.close() +PY +``` + +--- + +## 7. Readiness checklist for developers + +- Successful `import priskv` indicates the Python extension is built/installed correctly. +- Server logs show `UCX ... ready` indicates UCX transport is initialized. +- Client logs show `established` indicates handshake/connection succeeded. +- If `set/get` hangs, investigate the end-to-end request/response path: + - server response generation + - client response receive callback + - protocol struct decoding and completion-info handling + diff --git a/client/Makefile b/client/Makefile index 83887d3..51e30c1 100644 --- a/client/Makefile +++ b/client/Makefile @@ -33,7 +33,14 @@ VALKEY_LDFLAGS = -I$(VALKEY_INCLUDE_PATH) PRISKV_TARGETS_ALL = priskv-client priskv-benchmark priskv-example priskv-test_runtime PRISKV_TARGETS = priskv-client priskv-benchmark PRISKV_TARGETS_SRCS = $(patsubst priskv-%, %.c, $(PRISKV_TARGETS_ALL)) -SRCS = $(filter-out $(PRISKV_TARGETS_SRCS) $(VALKEY_BENCHMARK_SRC), $(wildcard *.c transport/*.c)) + +ifneq (,$(filter $(WITH_RDMA),yes YES y Y 1)) +CFLAGS += -DWITH_RDMA +else +RDMA_EXCLUDE = rdma.c transport/rdma.c +endif + +SRCS = $(filter-out $(PRISKV_TARGETS_SRCS) $(VALKEY_BENCHMARK_SRC) $(RDMA_EXCLUDE), $(wildcard *.c transport/*.c)) OBJS := $(SRCS:%.c=%.o) DEPS := $(OBJS:%.o=%.d) diff --git a/client/benchmark.c b/client/benchmark.c index cb3338e..7f3ec77 100644 --- a/client/benchmark.c +++ b/client/benchmark.c @@ -996,8 +996,9 @@ static void priskv_drv_get(void *ctx, const char *key, void *value, uint32_t val zctx->value = value; zctx->value_len = value_len; priskv_ctx->job->last_stage = "ACQUIRE"; - priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false /* pin_on_acquire */, - (uint64_t)zctx, zc_acquire_cb); + priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, + false /* pin_on_acquire */, 0 /* pin_ttl_ms */, (uint64_t)zctx, + zc_acquire_cb); return; } /* Remove duplicate ZeroCopy GET branch */ diff --git a/client/client.c b/client/client.c index 955655a..0a2eb3b 100644 --- a/client/client.c +++ b/client/client.c @@ -275,7 +275,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire) priskv_memory_region region = {0}; printf("ACQUIRE key=%s\n", key); /* Do not pin on acquire by default from CLI */ - status = priskv_acquire(ctx->client, key, false, 0 /* pin_ttl_ms */, ®ion); + status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false, 0 /* pin_ttl_ms */, + ®ion); if (status != PRISKV_STATUS_OK) { printf("Failed to GET, status(%d): %s\n", status, priskv_status_str(status)); return; @@ -477,7 +478,8 @@ static void acquire_only_handler(client_context *ctx, char *args) /* Align output field name with CLI flag semantics */ printf("ACQUIRE key=%s [PIN=%d, TTL(ms)=%" PRIu64 "]\n", key, pin_on_acquire, pin_ttl_ms); - status = priskv_acquire(ctx->client, key, pin_on_acquire, pin_ttl_ms, ®ion); + status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, pin_on_acquire, pin_ttl_ms, + ®ion); printf("ACQUIRE status(%d): %s, addr %p, length %u, token 0x%lx\n", status, priskv_status_str(status), (void *)region.addr, region.length, region.token); if (status == PRISKV_STATUS_OK) { diff --git a/client/priskv.h b/client/priskv.h index 5e5592f..25a9cbf 100644 --- a/client/priskv.h +++ b/client/priskv.h @@ -162,9 +162,10 @@ int priskv_alloc_async(priskv_client *client, const char *key, uint32_t alloc_le int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_seal, uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb); -/* Acquire memory region for zero copy read (pin_ttl_ms is PIN TTL in ms; 0 uses server default) */ -int priskv_acquire_async(priskv_client *client, const char *key, bool pin_on_acquire, - uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb); +/* Acquire: @timeout is key/op timeout for transport; @pin_ttl_ms is PIN TTL (0 = server default). */ +int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout, + bool pin_on_acquire, uint64_t pin_ttl_ms, uint64_t request_id, + priskv_generic_cb cb); /* Release memory region (by token pointer, reuse key field) */ int priskv_release_async(priskv_client *client, const uint64_t *token, bool unpin_on_release, @@ -224,9 +225,8 @@ int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length, int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal, uint64_t pin_ttl_ms); -int priskv_acquire(priskv_client *client, const char *key, bool pin_on_acquire, uint64_t pin_ttl_ms, - priskv_memory_region *region); - +int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout, + bool pin_on_acquire, uint64_t pin_ttl_ms, priskv_memory_region *region); int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release); int priskv_drop(priskv_client *client, const uint64_t *token); diff --git a/client/sync.c b/client/sync.c index bf1fd06..e87227d 100644 --- a/client/sync.c +++ b/client/sync.c @@ -190,12 +190,11 @@ int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal, return req_sync.status; } -int priskv_acquire(priskv_client *client, const char *key, bool pin_on_acquire, uint64_t pin_ttl_ms, - priskv_memory_region *region) +int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout, + bool pin_on_acquire, uint64_t pin_ttl_ms, priskv_memory_region *region) { priskv_transport_zero_copy_req_sync req_sync = {.status = 0xffff, .done = false}; - /* pin_ttl_ms is per-request PIN TTL in ms; 0 uses server default */ - priskv_acquire_async(client, key, pin_on_acquire, pin_ttl_ms, (uint64_t)&req_sync, + priskv_acquire_async(client, key, timeout, pin_on_acquire, pin_ttl_ms, (uint64_t)&req_sync, priskv_zero_copy_req_sync_cb); priskv_sync_wait(client, &req_sync.done); diff --git a/client/transport/transport.c b/client/transport/transport.c index daad80b..58c6aab 100644 --- a/client/transport/transport.c +++ b/client/transport/transport.c @@ -37,7 +37,9 @@ priskv_transport_driver *g_client_driver = NULL; extern priskv_transport_driver priskv_transport_driver_ucx; +#ifdef WITH_RDMA extern priskv_transport_driver priskv_transport_driver_rdma; +#endif static int priskv_build_check(void) { @@ -73,10 +75,12 @@ static void __attribute__((constructor)) priskv_client_transport_init(void) driver = &priskv_transport_driver_ucx; priskv_log_notice("Using UCX transport backend\n"); break; +#ifdef WITH_RDMA case PRISKV_TRANSPORT_BACKEND_RDMA: driver = &priskv_transport_driver_rdma; priskv_log_notice("Using RDMA transport backend\n"); break; +#endif default: priskv_log_error("Unknown transport backend: %d\n", backend); break; @@ -429,17 +433,16 @@ int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_ { uint32_t flags = pin_on_seal ? PRISKV_REQ_FLAG_PIN_ON_SEAL : 0; priskv_send_command(client, request_id, (const char *)token, 0 /* alloc_length */, NULL, 0, - 0 /* key_expiry_timeout */, pin_ttl_ms /* pin_ttl_ms */, - PRISKV_COMMAND_SEAL, flags, cb); + 0 /* key_expiry_timeout */, pin_ttl_ms, PRISKV_COMMAND_SEAL, flags, cb); return 0; } -int priskv_acquire_async(priskv_client *client, const char *key, bool pin_on_acquire, - uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb) +int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout, + bool pin_on_acquire, uint64_t pin_ttl_ms, uint64_t request_id, + priskv_generic_cb cb) { uint32_t flags = pin_on_acquire ? PRISKV_REQ_FLAG_PIN_ON_ACQUIRE : 0; - priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0, - 0 /* key_expiry_timeout */, pin_ttl_ms /* pin_ttl_ms */, + priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0, timeout, pin_ttl_ms, PRISKV_COMMAND_ACQUIRE, flags, cb); return 0; } diff --git a/client/transport/ucx.c b/client/transport/ucx.c index da328b7..12fa275 100644 --- a/client/transport/ucx.c +++ b/client/transport/ucx.c @@ -303,9 +303,11 @@ static void priskv_ucx_recv_resp_cb(ucs_status_t status, ucp_tag_t sender_tag, s return; } - if (length != sizeof(priskv_response)) { - priskv_log_warn("UCX: recv %d, expected %ld\n", length, sizeof(priskv_response)); - return; + if (ucs_unlikely(length != sizeof(priskv_response))) { + /* UCX 1.12 may provide unreliable `info->length` for tag recv callbacks. + * The response payload is a fixed-size struct, so keep going when UCS_OK. */ + priskv_log_warn("UCX: recv %zu, expected %zu (continuing)\n", length, + sizeof(priskv_response)); } uint64_t request_id = be64toh(resp->request_id); @@ -413,7 +415,6 @@ static int priskv_ucx_handshake(priskv_transport_conn *conn, ucp_address_t **add uint32_t *address_len) { int ret; - ucs_status_t status; uint8_t *peer_worker_address = NULL; size_t hs_size = sizeof(priskv_cm_ucx_handshake) + conn->worker->address_len; @@ -446,21 +447,19 @@ static int priskv_ucx_handshake(priskv_transport_conn *conn, ucp_address_t **add hs->cap.max_inflight_command = htobe16(conn->param.max_inflight_command); hs->address_len = htobe32(conn->worker->address_len); memcpy(hs->address, conn->worker->address, conn->worker->address_len); - status = ucs_socket_send(conn->connfd, hs, hs_size); + ret = priskv_safe_send(conn->connfd, hs, hs_size, NULL, NULL); free(hs); - if (status != UCS_OK) { - priskv_log_error("UCX: failed to send capability to server, status: %s\n", - ucs_status_string(status)); + if (ret) { + priskv_log_error("UCX: failed to send capability to server\n"); ret = -1; goto error; } /* receive response from server */ priskv_cm_ucx_handshake peer_hs; - status = ucs_socket_recv(conn->connfd, &peer_hs, sizeof(peer_hs)); - if (status != UCS_OK) { - priskv_log_error("UCX: failed to receive handshake msg from server, status: %s\n", - ucs_status_string(status)); + ret = priskv_safe_recv(conn->connfd, &peer_hs, sizeof(peer_hs), NULL, NULL); + if (ret) { + priskv_log_error("UCX: failed to receive handshake msg from server\n"); ret = -1; goto error; } @@ -493,10 +492,10 @@ static int priskv_ucx_handshake(priskv_transport_conn *conn, ucp_address_t **add ret = -1; goto error; } - status = ucs_socket_recv(conn->connfd, peer_worker_address, peer_worker_address_len); - if (status != UCS_OK) { - priskv_log_error("UCX: failed to receive peer_worker_address from server, status: %s\n", - ucs_status_string(status)); + ret = priskv_safe_recv(conn->connfd, peer_worker_address, peer_worker_address_len, NULL, + NULL); + if (ret) { + priskv_log_error("UCX: failed to receive peer_worker_address from server\n"); ret = -1; goto error; } diff --git a/cluster/client/Makefile b/cluster/client/Makefile index 99d8dc9..a105796 100644 --- a/cluster/client/Makefile +++ b/cluster/client/Makefile @@ -25,7 +25,7 @@ ifneq (,$(filter $(PRISKV_USE_CUDA),yes YES y Y 1)) CFLAGS += $(CUDA_LDFLAGS) -DPRISKV_USE_CUDA endif -PRISKV_CLUSTER_TARGETS = priskv-cluster-benchmark priskv-cluster-example +PRISKV_CLUSTER_TARGETS = priskv-cluster-benchmark priskv-cluster-example priskv-cluster-test_status PRISKV_CLUSTER_TARGETS_SRCS = $(patsubst priskv-cluster-%, %.c, $(PRISKV_CLUSTER_TARGETS)) ALL_SRCS = $(wildcard *.c) LIB_SRCS = $(filter-out $(PRISKV_CLUSTER_TARGETS_SRCS), $(wildcard *.c)) diff --git a/cluster/client/client.c b/cluster/client/client.c index 53891ea..1e03cbc 100644 --- a/cluster/client/client.c +++ b/cluster/client/client.c @@ -440,9 +440,8 @@ static priskvClusterRequest *priskvClusterRequestNew(priskvClusterNode *node, pr void *cbarg, RequestType type, const char *key, uint64_t timeout, priskvClusterClient *client) { - /* For ACQUIRE/SEAL, interpret 'timeout' from callers as pin_ttl_ms to keep API stable */ return priskvClusterRequestNewBase(node, sgl, nsgl, cb, NULL, cbarg, type, key, timeout, - false /*pin_key*/, 0 /*pin_ttl_ms*/, false /* unpin_key */, + false /* pin_key */, 0 /* pin_ttl_ms */, false /* unpin_key */, 0, 0, client); } @@ -452,9 +451,8 @@ priskvClusterZeroCopyRequestNew(priskvClusterNode *node, priskvClusterZeroCopyCa uint32_t alloc_length, uint64_t timeout, bool pin_key, uint64_t pin_ttl_ms, bool unpin_key, priskvClusterClient *client) { - /* Zero-copy requests never set unpin_key at construction */ return priskvClusterRequestNewBase(node, NULL, 0, NULL, cb, cbarg, type, key, timeout, pin_key, - pin_ttl_ms, unpin_key, alloc_length, 0 /*token*/, client); + pin_ttl_ms, unpin_key, alloc_length, 0 /* token */, client); } static void priskvClusterRequestFree(priskvClusterRequest *req) @@ -576,7 +574,6 @@ priskvClusterRequest *priskvClusterGetZeroCopyRequest(priskvClusterClient *clien return NULL; } - /* Use explicit pin_ttl_ms for ACQUIRE/SEAL; ALLOC ignores it */ priskvClusterRequest *req = priskvClusterZeroCopyRequestNew( node, cb, cbarg, type, key, alloc_length, timeout, pin_key, pin_ttl_ms, unpin_key, client); return req; @@ -610,18 +607,16 @@ int priskvClusterSubmitRequest(priskvClusterRequest *req) (uint64_t)req, priskvClusterZeroCopyRequestCallback); break; case SEAL: - priskv_seal_async(req->node->client, &req->token, req->pin_key /* pin_on_seal */, - req->pin_ttl_ms /* pin_ttl_ms */, (uint64_t)req, - priskvClusterRequestCallback); + priskv_seal_async(req->node->client, &req->token, req->pin_key, req->pin_ttl_ms, + (uint64_t)req, priskvClusterRequestCallback); break; case ACQUIRE: - priskv_acquire_async(req->node->client, req->key, req->pin_ttl_ms, - req->pin_key /* pin_on_acquire */, (uint64_t)req, + priskv_acquire_async(req->node->client, req->key, req->timeout, req->pin_key, + req->pin_ttl_ms, (uint64_t)req, priskvClusterZeroCopyRequestCallback); break; case RELEASE: - priskv_release_async(req->node->client, &req->token, - req->unpin_key /* unpin_on_release */, (uint64_t)req, + priskv_release_async(req->node->client, &req->token, req->unpin_key, (uint64_t)req, priskvClusterRequestCallback); break; case DROP: @@ -938,11 +933,11 @@ int priskvClusterAsyncSeal(priskvClusterClient *client, const char *key, const u bool pin_on_seal, uint64_t pin_ttl_ms, priskvClusterCallback cb, void *cbarg) { - priskvClusterRequest *req = priskvClusterGetRequest( - client, key, NULL, 0, cb, cbarg, pin_ttl_ms /* use timeout as pin_ttl_ms */, SEAL); + priskvClusterRequest *req = priskvClusterGetRequest(client, key, NULL, 0, cb, cbarg, 0, SEAL); if (req) { req->token = token ? *token : 0; req->pin_key = pin_on_seal; + req->pin_ttl_ms = pin_ttl_ms; } if (req == NULL) return -1; @@ -950,12 +945,13 @@ int priskvClusterAsyncSeal(priskvClusterClient *client, const char *key, const u return priskvClusterSubmitRequest(req); } -int priskvClusterAsyncAcquire(priskvClusterClient *client, const char *key, bool pin_on_acquire, - uint64_t pin_ttl_ms, priskvClusterZeroCopyCallback cb, void *cbarg) +int priskvClusterAsyncAcquire(priskvClusterClient *client, const char *key, uint64_t timeout, + bool pin_on_acquire, uint64_t pin_ttl_ms, + priskvClusterZeroCopyCallback cb, void *cbarg) { priskvClusterRequest *req = priskvClusterGetZeroCopyRequest( - client, key, cb, cbarg, 0 /* alloc_length */, 0 /* timeout */, pin_on_acquire /* pin_key */, - pin_ttl_ms, false /* unpin_key */, ACQUIRE); + client, key, cb, cbarg, 0 /* alloc_length */, timeout, pin_on_acquire, pin_ttl_ms, + false /* unpin_key */, ACQUIRE); if (req == NULL) return -1; @@ -1103,7 +1099,8 @@ priskvClusterStatus priskvClusterAcquire(priskvClusterClient *client, const char } priskv_memory_region region = {0}; - priskv_status status = priskv_acquire(node->client, key, pin_on_acquire, pin_ttl_ms, ®ion); + priskv_status status = + priskv_acquire(node->client, key, timeout, pin_on_acquire, pin_ttl_ms, ®ion); if (status == PRISKV_STATUS_OK) { if (addr) { *addr = region.addr; @@ -1117,7 +1114,7 @@ priskvClusterStatus priskvClusterAcquire(priskvClusterClient *client, const char } int priskvClusterAcquireRegion(priskvClusterClient *client, const char *key, uint64_t timeout, - bool pin_on_acquire, uint64_t pin_ttl_ms, + bool pin_on_acquire, uint64_t pin_ttl_ms, priskv_memory_region *region) { priskvClusterNode *node = priskvClusterGetNode(client, key); @@ -1125,8 +1122,7 @@ int priskvClusterAcquireRegion(priskvClusterClient *client, const char *key, uin return PRISKV_CLUSTER_STATUS_NO_SUCH_KEY; } - /* timeout is kept for API compatibility; pin_ttl_ms controls PIN TTL (0 uses server default) */ - return priskv_acquire(node->client, key, pin_on_acquire, pin_ttl_ms, region); + return priskv_acquire(node->client, key, timeout, pin_on_acquire, pin_ttl_ms, region); } priskvClusterStatus priskvClusterRelease(priskvClusterClient *client, const char *key, diff --git a/cluster/client/client.h b/cluster/client/client.h index 3797de7..12e7397 100644 --- a/cluster/client/client.h +++ b/cluster/client/client.h @@ -89,8 +89,9 @@ int priskvClusterAsyncAlloc(priskvClusterClient *client, const char *key, uint64 int priskvClusterAsyncSeal(priskvClusterClient *client, const char *key, const uint64_t *token, bool pin_on_seal, uint64_t pin_ttl_ms, priskvClusterCallback cb, void *cbarg); -int priskvClusterAsyncAcquire(priskvClusterClient *client, const char *key, bool pin_on_acquire, - uint64_t pin_ttl_ms, priskvClusterZeroCopyCallback cb, void *cbarg); +int priskvClusterAsyncAcquire(priskvClusterClient *client, const char *key, uint64_t timeout, + bool pin_on_acquire, uint64_t pin_ttl_ms, + priskvClusterZeroCopyCallback cb, void *cbarg); int priskvClusterAsyncRelease(priskvClusterClient *client, const char *key, const uint64_t *token, bool unpin_key, priskvClusterCallback cb, void *cbarg); int priskvClusterAsyncDrop(priskvClusterClient *client, const char *key, const uint64_t *token, @@ -105,7 +106,7 @@ priskvClusterStatus priskvClusterAlloc(priskvClusterClient *client, const char * priskvClusterStatus priskvClusterSeal(priskvClusterClient *client, const char *key, const uint64_t *token, bool pin_on_seal, uint64_t pin_ttl_ms); priskvClusterStatus priskvClusterAcquire(priskvClusterClient *client, const char *key, - uint64_t timeout, bool pin_on_acquire, uint64_t pin_ttl_ms, + uint64_t timeout, bool pin_on_acquire, uint64_t pin_ttl_ms, uint64_t *addr_offset, uint32_t *valuelen); priskvClusterStatus priskvClusterRelease(priskvClusterClient *client, const char *key, const uint64_t *token, bool unpin_on_release); @@ -121,5 +122,5 @@ priskvClusterStatus priskvClusterStatusFromPriskvStatus(priskv_status status); int priskvClusterAllocRegion(priskvClusterClient *client, const char *key, uint32_t alloc_length, uint64_t timeout, priskv_memory_region *region); int priskvClusterAcquireRegion(priskvClusterClient *client, const char *key, uint64_t timeout, - bool pin_on_acquire, uint64_t pin_ttl_ms, + bool pin_on_acquire, uint64_t pin_ttl_ms, priskv_memory_region *region); diff --git a/include/priskv-config.h b/include/priskv-config.h index cc0e0ad..ebb30d3 100644 --- a/include/priskv-config.h +++ b/include/priskv-config.h @@ -42,8 +42,7 @@ extern "C" .prefix = PREFIX, \ .table = TABLE, \ .size = sizeof(TYPE), \ - .list = {NULL, NULL}, \ - .flags = 0}; + .list = {NULL, NULL}}; #define PRISKV_CONFIG_GET_TABLE(TABLE) &g_##TABLE##_config_entry #define PRISKV_ENV_PREFIX "PRISKV_" diff --git a/include/priskv-utils.h b/include/priskv-utils.h index 14e4bea..7f75a0c 100644 --- a/include/priskv-utils.h +++ b/include/priskv-utils.h @@ -148,6 +148,55 @@ static inline void priskv_inet_ntop(struct sockaddr *addr, char *dst) } } +static inline int priskv_sock_io(int sock, ssize_t (*sock_call)(int, void *, size_t, int), + int poll_events, void *data, size_t size, + void (*progress)(void *arg), void *arg, const char *name) +{ + size_t total = 0; + struct pollfd pfd; + int ret; + + while (total < size) { + pfd.fd = sock; + pfd.events = poll_events; + pfd.revents = 0; + + ret = poll(&pfd, 1, 1); /* poll for 1ms */ + if (ret > 0) { + ret = sock_call(sock, (char *)data + total, size - total, 0); + if ((ret == 0) && (poll_events & POLLIN)) { + return -1; + } + if (ret < 0) { + return -1; + } + total += ret; + } else if ((ret < 0) && (errno != EINTR)) { + return -1; + } + + /* progress user context */ + if (progress != NULL) { + progress(arg); + } + } + return 0; +} + +static inline int priskv_safe_send(int sock, void *data, size_t size, void (*progress)(void *arg), + void *arg) +{ + typedef ssize_t (*sock_call)(int, void *, size_t, int); + + return priskv_sock_io(sock, (sock_call)send, POLLOUT, data, size, progress, arg, "send"); +} + +static inline int priskv_safe_recv(int sock, void *data, size_t size, void (*progress)(void *arg), + void *arg) +{ + return priskv_sock_io(sock, recv, POLLIN, data, size, progress, arg, "recv"); +} + static inline unsigned long priskv_rdtsc(void) { unsigned long low, high; diff --git a/lib/config.c b/lib/config.c index af1e95e..835422d 100644 --- a/lib/config.c +++ b/lib/config.c @@ -15,6 +15,7 @@ #include "priskv-config.h" #include "priskv-log.h" #include +#include static const char *priskv_transport_backend_names[] = {[PRISKV_TRANSPORT_BACKEND_RDMA] = "RDMA", [PRISKV_TRANSPORT_BACKEND_UCX] = "UCX", @@ -128,10 +129,10 @@ static void priskv_config_init_impl(void) "Shared memory will be enabled automatically."); } - ucs_config_parser_print_opts( - stdout, "PrisKV Environment Variables", &g_config, priskv_config_table, NULL, - PRISKV_ENV_PREFIX, - UCS_CONFIG_PRINT_CONFIG | UCS_CONFIG_PRINT_HEADER | UCS_CONFIG_PRINT_DOC, NULL); + ucs_config_parser_print_opts(stdout, "PrisKV Environment Variables", &g_config, + priskv_config_table, NULL, PRISKV_ENV_PREFIX, + UCS_CONFIG_PRINT_CONFIG | UCS_CONFIG_PRINT_HEADER | + UCS_CONFIG_PRINT_DOC); // logging priskv_set_log_level(g_config.logging.log_level); @@ -152,7 +153,8 @@ void priskv_config_init(void) ucs_status_t priskv_config_parser_fill_opts(void *opts, ucs_config_global_list_entry_t *entry, const char *env_prefix, int ignore_errors) { - return ucs_config_parser_fill_opts(opts, entry, env_prefix, ignore_errors); + /* UCX v1.12+: ucs_config_parser_fill_opts(opts, fields, env_prefix, table_prefix, ignore_errors) */ + return ucs_config_parser_fill_opts(opts, entry->table, env_prefix, entry->prefix, ignore_errors); } void priskv_config_parser_release_opts(void *opts, ucs_config_field_t *fields) @@ -163,6 +165,6 @@ void priskv_config_parser_release_opts(void *opts, ucs_config_field_t *fields) ucs_status_t priskv_config_parser_set_value(void *opts, ucs_config_field_t *fields, const char *prefix, const char *name, const char *value) { - - return ucs_config_parser_set_value(opts, fields, prefix, name, value); + (void)prefix; + return ucs_config_parser_set_value(opts, fields, name, value); } diff --git a/lib/ucx.c b/lib/ucx.c index 59781de..080e6b1 100644 --- a/lib/ucx.c +++ b/lib/ucx.c @@ -205,8 +205,8 @@ ucs_status_t priskv_ucx_munmap(priskv_ucx_memh *memh) } if (memh->rkey_buffer) { - ucp_memh_buffer_release_params_t params = {.field_mask = 0}; - ucp_memh_buffer_release(memh->rkey_buffer, ¶ms); + /* Packed RKEY buffer release (UCX 1.12+). */ + ucp_rkey_buffer_release(memh->rkey_buffer); memh->rkey_buffer = NULL; } ucp_mem_unmap(memh->context->handle, memh->handle); @@ -255,7 +255,10 @@ priskv_ucx_worker *priskv_ucx_worker_create(priskv_ucx_context *context, uint64_ worker->efd = -1; } - status = ucp_worker_get_address(worker->handle, &worker->address, &worker->address_len); + /* UCX uses size_t* for address length; wire format uses uint32_t. */ + size_t addr_len = 0; + status = ucp_worker_get_address(worker->handle, &worker->address, &addr_len); + worker->address_len = (uint32_t)addr_len; PRISKV_UCX_RETURN_IF_ERROR( status, "priskv_ucx_worker_init: failed to get address", { free(worker); }, NULL); @@ -497,7 +500,8 @@ static ucs_status_ptr_t priskv_ucx_post_tag_recv(priskv_ucx_worker *worker, pris ucp_request_param_t param = {.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_DATATYPE | - UCP_OP_ATTR_FIELD_USER_DATA, + UCP_OP_ATTR_FIELD_USER_DATA | + UCP_OP_ATTR_FIELD_RECV_INFO, .datatype = ucp_dt_make_contig(1), .cb.recv = priskv_ucx_request_tag_recv_cb_intl, .user_data = request}; diff --git a/pypriskv/priskv/priskv_client.py b/pypriskv/priskv/priskv_client.py index bd0157a..56bb93a 100644 --- a/pypriskv/priskv/priskv_client.py +++ b/pypriskv/priskv/priskv_client.py @@ -83,11 +83,12 @@ def seal(self, def acquire(self, key: str, - pin_ttl_ms: int = 0, + timeout: int = client.PRISKV_KEY_MAX_TIMEOUT, pin_on_acquire: bool = False, + pin_ttl_ms: int = 0, ) -> Tuple[int, client.MemoryRegion]: - """Acquire zero-copy read region. When pin_on_acquire is True, pin_ttl_ms specifies the PIN TTL in milliseconds; 0 means server default.""" - status, region = client.acquire(self.conn, key, pin_on_acquire, pin_ttl_ms) + """Acquire: timeout is transport/key timeout; pin_ttl_ms is PIN TTL when pin_on_acquire is True.""" + status, region = client.acquire(self.conn, key, timeout, pin_on_acquire, pin_ttl_ms) return status, region def release(self, diff --git a/pypriskv/pybind.cpp b/pypriskv/pybind.cpp index f0a6576..9b7b58b 100644 --- a/pypriskv/pybind.cpp +++ b/pypriskv/pybind.cpp @@ -139,12 +139,13 @@ std::tuple priskv_alloc_wrapper(uintptr_t client, std::string key } std::tuple priskv_acquire_wrapper(uintptr_t client, std::string key, - uint64_t pin_ttl_ms, bool pin_on_acquire) + uint64_t timeout, bool pin_on_acquire, + uint64_t pin_ttl_ms) { uint64_t addr_offset = 0; uint32_t value_length = 0; - int ret = priskvClusterAcquire((priskvClusterClient *)client, key.c_str(), 0 /* timeout */, + int ret = priskvClusterAcquire((priskvClusterClient *)client, key.c_str(), timeout, pin_on_acquire, pin_ttl_ms, &addr_offset, &value_length); return {ret, addr_offset, value_length}; } @@ -327,18 +328,17 @@ PYBIND11_MODULE(_priskv_client, m) m.def( "acquire", - [](uintptr_t client, std::string key, bool pin_on_acquire, uint64_t pin_ttl_ms) { + [](uintptr_t client, std::string key, uint64_t timeout, bool pin_on_acquire, + uint64_t pin_ttl_ms) { priskv_memory_region region {0}; - /* Pass timeout=0 and explicit pin_ttl_ms */ - int ret = - priskvClusterAcquireRegion((priskvClusterClient *)client, key.c_str(), - 0 /* timeout */, pin_on_acquire, pin_ttl_ms, ®ion); + int ret = priskvClusterAcquireRegion((priskvClusterClient *)client, key.c_str(), timeout, + pin_on_acquire, pin_ttl_ms, ®ion); return py::make_tuple(ret, region); }, - py::arg("client"), py::arg("key"), py::arg("pin_on_acquire") = false, - py::arg("pin_ttl_ms") = 0, - "A function to acquire memory region for read; when PIN is requested, pin_ttl_ms sets TTL " - "in ms (0 uses server default)."); + py::arg("client"), py::arg("key"), py::arg("timeout") = PRISKV_KEY_MAX_TIMEOUT, + py::arg("pin_on_acquire") = false, py::arg("pin_ttl_ms") = 0, + "A function to acquire memory region for read; timeout is transport/key timeout; " + "pin_ttl_ms is PIN TTL when pin_on_acquire is true (0 uses server default)."); m.def( "release", diff --git a/pypriskv/testing.py b/pypriskv/testing.py index c05a1ab..77c660c 100644 --- a/pypriskv/testing.py +++ b/pypriskv/testing.py @@ -540,17 +540,18 @@ def test_unpin_no_such_key(self): def test_pin_ttl_expire_on_acquire(self): TEST_KEY = "py_pin_ttl_acquire" SIZE = 256 + TIMEOUT = 3000 # TTL is in milliseconds; sleep slightly longer than 1.5s to avoid clock granularity issues PIN_TTL_MS = 1500 SLEEP_SEC = 1.6 - status, region = self.client.alloc(TEST_KEY, SIZE, 3000) + status, region = self.client.alloc(TEST_KEY, SIZE, TIMEOUT) assert status == 0 status = self.client.seal(TEST_KEY, region) assert status == 0 - # ACQUIRE with PIN(with TTL) - status, acq = self.client.acquire(TEST_KEY, PIN_TTL_MS, True) + # ACQUIRE with PIN and explicit PIN TTL (transport timeout is TIMEOUT) + status, acq = self.client.acquire(TEST_KEY, TIMEOUT, pin_on_acquire=True, pin_ttl_ms=PIN_TTL_MS) assert status == 0, f"acquire(pin ttl) failed: {status}" # wait TTL expired @@ -560,7 +561,7 @@ def test_pin_ttl_expire_on_acquire(self): assert status == priskv.PRISKV_STATUS.PRISKV_STATUS_UNPIN_NOT_CLOSED, \ f"expected UNPIN_NOT_CLOSED after TTL expiry, got {status}" - status2, acq2 = self.client.acquire(TEST_KEY, 0, False) + status2, acq2 = self.client.acquire(TEST_KEY, TIMEOUT, pin_on_acquire=False, pin_ttl_ms=0) assert status2 == priskv.PRISKV_STATUS.PRISKV_STATUS_OK status2 = self.client.release(TEST_KEY, acq2, unpin_on_release=True) assert status2 == priskv.PRISKV_STATUS.PRISKV_STATUS_UNPIN_NOT_CLOSED, \ diff --git a/run_e2e_test.py b/run_e2e_test.py index 88024de..f1f7f79 100755 --- a/run_e2e_test.py +++ b/run_e2e_test.py @@ -28,7 +28,11 @@ def find_rdma_dev(): for dev in os.listdir(ibclass): netdev = ibclass + dev + "/ports/1/gid_attrs/ndevs/0" with open(netdev) as fp: - addrs = netifaces.ifaddresses(fp.readline().strip("\n")) + iface = fp.readline().strip("\n") + try: + addrs = netifaces.ifaddresses(iface) + except ValueError: + continue if netifaces.AF_INET in addrs: ipv4_addr = addrs[netifaces.AF_INET][0]["addr"] print( diff --git a/run_unit_test.py b/run_unit_test.py index 3168117..d009a6b 100755 --- a/run_unit_test.py +++ b/run_unit_test.py @@ -35,7 +35,7 @@ def priskv_unit_test(parallel: bool = True): "./server/test/test-buddy-mt", "./server/test/test-kv", "./server/test/test-kv-mt", "./server/test/test-memory --no-tmpfs", "./server/test/test-slab", "./server/test/test-transport 8 200", - "./server/test/test-kv-pin-ttl" + "./server/test/test-kv-pin-ttl", ] print("---- PrisKV UNIT TEST ----") diff --git a/server/Makefile b/server/Makefile index 4e5c471..bd5322e 100644 --- a/server/Makefile +++ b/server/Makefile @@ -38,12 +38,18 @@ else CFLAGS += -O0 endif +ifneq (,$(filter $(WITH_RDMA),yes YES y Y 1)) +CFLAGS += -DWITH_RDMA +else +RDMA_EXCLUDE = ./rdma.c ./transport/rdma.c +endif + PRISKV_BINPATH = usr/bin PRISKV_MANPATH = usr/share/man PRISKV_SERVER_TARGETS = priskv-server priskv-memfile PRISKV_SERVER_TARGETS_SRCS = $(patsubst priskv-%, ./%.c, $(PRISKV_SERVER_TARGETS)) -PRISKV_SERVER_SRCS = $(filter-out $(PRISKV_SERVER_TARGETS_SRCS), $(shell find . -path ./test -prune -o -name "*.c" -print)) +PRISKV_SERVER_SRCS = $(filter-out $(PRISKV_SERVER_TARGETS_SRCS) $(RDMA_EXCLUDE), $(shell find . -path ./test -prune -o -name "*.c" -print)) PRISKV_SERVER_OBJS := $(PRISKV_SERVER_SRCS:%.c=%.o) PRISKV_SERVER_DEPS := $(PRISKV_SERVER_OBJS:%.o=%.d) diff --git a/server/kv.c b/server/kv.c index b79376f..6a68d8e 100644 --- a/server/kv.c +++ b/server/kv.c @@ -954,10 +954,10 @@ int priskv_key_unpin_latest(void *_kv, void *_keynode) priskv_resp_status resp = priskv_pin_count_delta_latest(kv, node->key, node->keylen, -1, 0); /* Stats: count every UNPIN attempt; record not-closed cases. */ - __sync_fetch_and_add(&kv->pin_stats.unpin_ops, 1); - if (resp == PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED) { - __sync_fetch_and_add(&kv->pin_stats.unpin_not_closed, 1); - } + __sync_fetch_and_add(&kv->pin_stats.unpin_ops, 1); + if (resp == PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED) { + __sync_fetch_and_add(&kv->pin_stats.unpin_not_closed, 1); + } if (resp != PRISKV_RESP_STATUS_OK) { priskv_log_warn("KV: UNPIN_LATEST status=%d for node=%p (len=%u)\n", resp, (void *)node, node->keylen); diff --git a/server/kv.h b/server/kv.h index 061e654..9e9ab02 100644 --- a/server/kv.h +++ b/server/kv.h @@ -165,6 +165,7 @@ int priskv_publish_node(void *_kv, void *_keynode); int priskv_publish_node_with_pin(void *_kv, void *_keynode, bool pin_on_publish, uint64_t ttl_ms); int priskv_drop_node(void *_kv, void *_keynode); +/* Pin/Unpin controls for lifecycle protection */ int priskv_key_unpin_latest(void *_kv, void *_keynode); /* Pin on the latest version; ttl_ms==0 uses the default TTL */ int priskv_key_pin_latest(void *_kv, void *_keynode, uint64_t ttl_ms); @@ -174,7 +175,6 @@ uint64_t priskv_get_pin_ops(void *_kv); uint64_t priskv_get_pin_failed_ops(void *_kv); uint64_t priskv_get_unpin_ops(void *_kv); uint64_t priskv_get_unpin_not_closed(void *_kv); - #if defined(__cplusplus) } #endif diff --git a/server/test/Makefile b/server/test/Makefile index ce8a819..27cad45 100644 --- a/server/test/Makefile +++ b/server/test/Makefile @@ -33,7 +33,7 @@ ifneq (,$(filter $(PRISKV_USE_CUDA),yes YES y Y 1)) CFLAGS += $(CUDA_LDFLAGS) -DPRISKV_USE_CUDA endif -.PHONY: $(TEST_BUDDY) ${TEST_BUDDY_MT} $(TEST_SLAB) $(TEST_SLAB_MT) $(TEST_KV) $(TEST_KV_MT) $(TEST_TRANSPORT) $(TEST_MEMORY) $(TEST_ACL) $(TEST_KV_EXPIRE_ROUTINE) $(TEST_BE_REDIS) +.PHONY: $(TEST_BUDDY) ${TEST_BUDDY_MT} $(TEST_SLAB) $(TEST_SLAB_MT) $(TEST_KV) $(TEST_KV_MT) $(TEST_TRANSPORT) $(TEST_MEMORY) $(TEST_ACL) $(TEST_KV_EXPIRE_ROUTINE) $(TEST_BE_REDIS) $(TEST_KV_PIN_TTL) OBJS = ../memory.o ../kv.o ../slab.o ../crc.o ../acl.o all: $(TEST_BUDDY) ${TEST_BUDDY_MT} $(TEST_SLAB) $(TEST_SLAB_MT) $(TEST_KV) $(TEST_KV_MT) $(TEST_TRANSPORT) $(TEST_MEMORY) $(TEST_ACL) $(TEST_KV_EXPIRE_ROUTINE) $(TEST_BE_REDIS) $(TEST_KV_PIN_TTL) @@ -87,7 +87,6 @@ else $(TEST_KV_EXPIRE_ROUTINE): @echo "UCX not available, skip $(TEST_KV_EXPIRE_ROUTINE)" endif - ifeq ($(shell pkg-config --exists ucx; echo $$?),0) $(TEST_KV_PIN_TTL): $(OBJS) $(CC) test_kv_pin_ttl.c ../../lib/workqueue.c ../../lib/threads.c ../../lib/event.c ../../lib/ucx.c ../../lib/config.c ../memory.c ../kv.c ../slab.c ../buddy.c ../crc.c ../../lib/log.c ../backend/backend.c ../transport/transport.c ../transport/rdma.c ../transport/ucx.c ../tiering.c ../acl.c $(CFLAGS) $(UCX_CFLAGS) -o $(TEST_KV_PIN_TTL) -lmount -lpthread -lrdmacm -libverbs $(UCX_LIBS) @@ -96,7 +95,6 @@ $(TEST_KV_PIN_TTL): @echo "UCX not available, skip $(TEST_KV_PIN_TTL)" endif - $(TEST_BE_REDIS): $(CC) test_be_redis.c ../../lib/log.c ../../lib/event.c ../../lib/workqueue.c ../../lib/threads.c ../backend/backend.c ../backend/be_redis.c $(CFLAGS) -o $(TEST_BE_REDIS) -levent -lhiredis diff --git a/server/transport/transport.c b/server/transport/transport.c index 8edb376..ee8f176 100644 --- a/server/transport/transport.c +++ b/server/transport/transport.c @@ -38,7 +38,9 @@ priskv_transport_server g_transport_server = { bool priskv_test_token_add_fail_once = false; extern priskv_transport_driver priskv_transport_driver_ucx; +#ifdef WITH_RDMA extern priskv_transport_driver priskv_transport_driver_rdma; +#endif uint32_t g_slow_query_threshold_latency_us = SLOW_QUERY_THRESHOLD_LATENCY_US; @@ -60,10 +62,12 @@ static void __attribute__((constructor)) priskv_server_transport_init(void) driver = &priskv_transport_driver_ucx; priskv_log_notice("Using UCX transport backend\n"); break; +#ifdef WITH_RDMA case PRISKV_TRANSPORT_BACKEND_RDMA: driver = &priskv_transport_driver_rdma; priskv_log_notice("Using RDMA transport backend\n"); break; +#endif default: priskv_log_error("Unknown transport backend: %d\n", backend); break; @@ -310,18 +314,35 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re return -EPROTO; } - keylen = len - keyoff; + /* UCX callback's `length` may be unreliable under some UCX versions. + * Rely on the protocol header's key_length instead of deriving it from `len`. */ + keylen = be16toh(req->key_length); + if (!keylen) { - priskv_log_warn("Transport: <%s - %s> empty key. recv %d, less than %d, nsgl 0x%x\n", + priskv_log_warn("Transport: <%s - %s> empty key. len(%u) keyoff(%u) nsgl 0x%x\n", conn->local_addr, conn->peer_addr, len, keyoff, nsgl); driver->send_response(conn, req->request_id, PRISKV_RESP_STATUS_KEY_EMPTY, 0, 0, 0); return -EPROTO; } + /* Optional sanity check: if UCX `len` is sane and smaller than what we expect, treat as protocol error. */ + if (len != 0 && len < (uint32_t)(keyoff + keylen)) { + priskv_log_warn("Transport: <%s - %s> invalid key. recv len(%u) < keyoff(%u)+keylen(%u)\n", + conn->local_addr, conn->peer_addr, len, keyoff, keylen); + driver->send_response(conn, req->request_id, PRISKV_RESP_STATUS_INVALID_COMMAND, 0, 0, 0); + return -EPROTO; + } + if (keylen > conn->conn_cap.max_key_length) { - priskv_log_warn("Transport: <%s - %s> invalid key. key(%d) exceeds max_key_length(%d)\n", - conn->local_addr, conn->peer_addr, keylen, conn->conn_cap.max_key_length); + uint16_t raw_key_length = be16toh(req->key_length); + uint16_t raw_nsgl = be16toh(req->nsgl); + uint32_t raw_alloc_length = be32toh(req->alloc_length); + priskv_log_warn( + "Transport: <%s - %s> invalid key. len(%u) keyoff(%u) keylen(%u) " + "key_length(%u) nsgl(%u) alloc_length(%u) exceeds max_key_length(%u)\n", + conn->local_addr, conn->peer_addr, len, keyoff, keylen, raw_key_length, raw_nsgl, + raw_alloc_length, conn->conn_cap.max_key_length); driver->send_response(conn, req->request_id, PRISKV_RESP_STATUS_KEY_TOO_BIG, 0, 0, 0); return -EPROTO; } @@ -593,14 +614,10 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re PRISKV_RESP_STATUS_PERMISSION_DENIED, 0, 0, 0); break; } - /* Atomically publish and optionally pin; treat req.timeout as the TTL (ms) for this - * pin. */ - /* Use pin_ttl_ms; 0 means default TTL. Only read TTL when pin_on_seal is set. */ + /* Atomically publish and optionally pin; pin_ttl_ms 0 means default TTL. */ status = priskv_publish_node_with_pin( conn->kv, keynode, (flags & PRISKV_REQ_FLAG_PIN_ON_SEAL) != 0, (flags & PRISKV_REQ_FLAG_PIN_ON_SEAL) ? pin_ttl_ms : 0); - /* TTL registration is handled in the KV layer (publish critical section); no need to - * duplicate here. */ priskv_transport_token_del(conn, token); ret = driver->send_response(conn, req->request_id, status, 0, 0, 0); } @@ -614,26 +631,18 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re } { uint64_t addr_offset = 0; - /* Enforce ACQUIRE+PIN all-or-nothing semantics when requested: - * - If PIN_ON_ACQUIRE is set, attempt to pin the latest visible version first. - * - If pin fails (e.g., key deleted concurrently), fail the whole ACQUIRE and release - * the reference acquired by priskv_get_key. - */ + /* Enforce ACQUIRE+PIN all-or-nothing semantics when requested. */ if (flags & PRISKV_REQ_FLAG_PIN_ON_ACQUIRE) { - /* Use pin_ttl_ms; 0 means default TTL. */ priskv_resp_status presp = priskv_key_pin_latest(conn->kv, keynode, pin_ttl_ms); if (presp != PRISKV_RESP_STATUS_OK) { - /* Atomicity: no token created, drop our reference and return pin's status */ priskv_get_key_end(keynode); ret = driver->send_response(conn, req->request_id, presp, 0, 0, 0); break; } } - /* Create token only after (optional) pin succeeds to keep ACQUIRE+PIN atomic */ uint64_t token = priskv_transport_token_add(conn, keynode, PRISKV_TOKEN_TYPE_ACQUIRE); if (!token) { - /* Roll back pin if we pinned, then release the key reference */ if (flags & PRISKV_REQ_FLAG_PIN_ON_ACQUIRE) { (void)priskv_key_unpin_latest(conn->kv, keynode); } @@ -642,7 +651,6 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re 0, 0, 0); break; } - /* TTL registration is handled in the KV layer (pin path); no need to duplicate here. */ status = priskv_value_addr_offset(conn->kv, val, &addr_offset); ret = driver->send_response(conn, req->request_id, status, valuelen, addr_offset, token); @@ -675,19 +683,10 @@ int priskv_transport_handle_recv(priskv_transport_conn *conn, priskv_request *re PRISKV_RESP_STATUS_PERMISSION_DENIED, 0, 0, 0); break; } - /* RELEASE semantics when UNPIN is requested: - * - If PRISKV_REQ_FLAG_UNPIN_ON_RELEASE is set and unpin fails (e.g., NO_SUCH_KEY / - * UNPIN_NOT_CLOSED), we currently release the ACQUIRE reference and delete the - * token as a best-effort cleanup, and return the unpin status to the client. - * - Otherwise, we release the reference and delete the token, returning OK. - * Note: This favors simplicity over retry semantics; adjust if caller requires a - * strict all-or-nothing behavior. - */ priskv_resp_status resp = PRISKV_RESP_STATUS_OK; if (flags & PRISKV_REQ_FLAG_UNPIN_ON_RELEASE) { resp = priskv_key_unpin_latest(conn->kv, keynode); } - /* Either UNPIN succeeded or not requested: finish RELEASE */ priskv_get_key_end(keynode); priskv_transport_token_del(conn, token); ret = driver->send_response(conn, req->request_id, resp, 0, 0, 0); diff --git a/server/transport/ucx.c b/server/transport/ucx.c index 8758e26..b516916 100644 --- a/server/transport/ucx.c +++ b/server/transport/ucx.c @@ -237,10 +237,9 @@ static inline void priskv_ucx_reject(priskv_transport_conn *client, priskv_cm_st .status = htobe16(status), .value = htobe64(value), }; - ucs_status_t ucs_status = ucs_socket_send(client->connfd, &rej_msg_be, sizeof(rej_msg_be)); - if (ucs_status != UCS_OK) { - priskv_log_error("UCX: send reject message failed, status: %s\n", - ucs_status_string(ucs_status)); + int ret = priskv_safe_send(client->connfd, &rej_msg_be, sizeof(rej_msg_be), NULL, NULL); + if (ret < 0) { + priskv_log_error("UCX: send reject message failed: %m\n"); } } @@ -275,11 +274,9 @@ static inline int priskv_ucx_accept(priskv_transport_conn *client) client->peer_addr, address_len, print_len, worker_address_hex); } - ucs_status_t status = ucs_socket_send(client->connfd, hs, hs_size); - if (status != UCS_OK) { - ret = -1; - priskv_log_error("UCX: send accept message failed, status: %s\n", - ucs_status_string(status)); + ret = priskv_safe_send(client->connfd, hs, hs_size, NULL, NULL); + if (ret < 0) { + priskv_log_error("UCX: send accept message failed: %m\n"); goto out_free_msg; } @@ -294,7 +291,6 @@ static inline int priskv_ucx_accept(priskv_transport_conn *client) static inline int priskv_ucx_handle_handshake(void *arg) { int ret; - ucs_status_t sock_status; priskv_cm_ucx_handshake peer_hs; priskv_cm_status status; @@ -304,10 +300,9 @@ static inline int priskv_ucx_handle_handshake(void *arg) int connfd = client->connfd; /* #step0, recv handshake msg */ - sock_status = ucs_socket_recv(connfd, &peer_hs, sizeof(peer_hs)); - if (sock_status != UCS_OK) { - priskv_log_error("UCX: recv handshake msg failed, status: %s\n", - ucs_status_string(sock_status)); + ret = priskv_safe_recv(connfd, &peer_hs, sizeof(peer_hs), NULL, NULL); + if (ret < 0) { + priskv_log_error("UCX: recv handshake msg failed: %m\n"); ucs_close_fd(&connfd); return -1; } @@ -325,10 +320,9 @@ static inline int priskv_ucx_handle_handshake(void *arg) ucs_close_fd(&connfd); return -1; } - sock_status = ucs_socket_recv(connfd, peer_worker_address, peer_worker_address_len); - if (sock_status != UCS_OK) { - priskv_log_error("UCX: recv peer address failed, status: %s\n", - ucs_status_string(sock_status)); + ret = priskv_safe_recv(connfd, peer_worker_address, peer_worker_address_len, NULL, NULL); + if (ret < 0) { + priskv_log_error("UCX: recv peer address failed: %m\n"); ucs_close_fd(&connfd); return -1; } @@ -478,19 +472,15 @@ static inline void priskv_ucx_handle_cm(int fd, void *opaque, uint32_t ev) connfd = accept(listener->listenfd, (struct sockaddr *)&client_addr, &client_addr_len); if (connfd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - // no connection available, return return; } else if (errno == EINTR) { - // interrupted by signal, try again goto again; } else { - // other errors, log and return priskv_log_error("UCX: accept on listenfd %d failed: %m\n", listener->listenfd); return; } } - // got a connection priskv_inet_ntop(&client_addr, peer_addr); priskv_log_info("UCX: accept on listenfd %d, connfd %d, client addr %s\n", listener->listenfd, connfd, peer_addr);