diff --git a/run_unit_test.py b/run_unit_test.py index 611d7f3..3168117 100755 --- a/run_unit_test.py +++ b/run_unit_test.py @@ -34,7 +34,7 @@ def priskv_unit_test(parallel: bool = True): "./server/test/test-slab-mt", "./server/test/test-buddy", "./server/test/test-buddy-mt", "./server/test/test-kv", "./server/test/test-kv-mt", "./server/test/test-memory --no-tmpfs", - "./server/test/test-slab", "./server/test/test-transport", + "./server/test/test-slab", "./server/test/test-transport 8 200", "./server/test/test-kv-pin-ttl" ] diff --git a/server/kv.c b/server/kv.c index 0daee68..b79376f 100644 --- a/server/kv.c +++ b/server/kv.c @@ -806,6 +806,8 @@ int priskv_alloc_node_private(void *_kv, uint8_t *key, uint16_t keylen, uint8_t * - If a same-named published key exists, delete the old node (pop+free) first, then insert the * new node into the hash table. * - Join LRU and clear inprocess so it becomes readable (ACQUIRE/GET). + * - Hold a temporary reference across the publish window to prevent premature reclamation while + * the node is being made visible and integrated into LRU; balanced after old-node cleanup. */ static int __priskv_publish_node_with_pin(priskv_kv *kv, priskv_key *keynode, bool pin_on_publish, uint64_t ttl_ms) @@ -860,8 +862,21 @@ static int __priskv_publish_node_with_pin(priskv_kv *kv, priskv_key *keynode, bo /* Centralized increment + TTL extension */ __pin_update_locked(keynode, ttl_ms); pthread_spin_unlock(&keynode->lock); + /* Count pin-on-publish as a PIN operation for observability parity + * with ACQUIRE+PIN. We intentionally do this here (not by calling + * priskv_key_pin_latest) to avoid lock-order issues during publish. */ + __sync_fetch_and_add(&kv->pin_stats.pin_ops, 1); } + /* + * Take a temporary reference before making the node visible. + * Rationale: concurrent GET/UNPIN may briefly observe the node between + * visibility and LRU integration. This extra ref ensures the node cannot + * reach refcnt==0 and be reclaimed in that window. It is released after + * LRU join and old-node cleanup. + */ + priskv_keynode_ref(keynode); + /* Make the new node visible in the same critical section. */ list_add_tail(&hash_head->head, &keynode->entry); pthread_spin_unlock(&hash_head->lock); @@ -876,6 +891,9 @@ static int __priskv_publish_node_with_pin(priskv_kv *kv, priskv_key *keynode, bo __priskv_del_key(kv, old_keynode); } + /* Balance the temporary publish-time reference. */ + priskv_keynode_deref(keynode); + return PRISKV_RESP_STATUS_OK; } @@ -928,12 +946,21 @@ int priskv_key_unpin_latest(void *_kv, void *_keynode) return PRISKV_RESP_STATUS_SERVER_ERROR; } + if (node->kv != kv || node->keylen == 0) { + priskv_log_warn("KV: UNPIN_LATEST stale-handle? node=%p kv_match=%d keylen=%u\n", + (void *)node, node->kv == kv, node->keylen); + } + priskv_resp_status resp = priskv_pin_count_delta_latest(kv, node->key, node->keylen, -1, 0); /* Stats: count every UNPIN attempt; record not-closed cases. */ - kv->pin_stats.unpin_ops++; - if (resp == PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED) { - kv->pin_stats.unpin_not_closed++; + __sync_fetch_and_add(&kv->pin_stats.unpin_ops, 1); + if (resp == PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED) { + __sync_fetch_and_add(&kv->pin_stats.unpin_not_closed, 1); + } + if (resp != PRISKV_RESP_STATUS_OK) { + priskv_log_warn("KV: UNPIN_LATEST status=%d for node=%p (len=%u)\n", resp, (void *)node, + node->keylen); } return resp; } @@ -950,10 +977,10 @@ int priskv_key_pin_latest(void *_kv, void *_keynode, uint64_t ttl_ms) priskv_resp_status resp = priskv_pin_count_delta_latest(kv, node->key, node->keylen, +1, ttl_ms); if (resp == PRISKV_RESP_STATUS_OK) { - kv->pin_stats.pin_ops++; + __sync_fetch_and_add(&kv->pin_stats.pin_ops, 1); } else { /* Count failed PIN attempts (e.g., NO_SUCH_KEY) */ - kv->pin_stats.pin_failed_ops++; + __sync_fetch_and_add(&kv->pin_stats.pin_failed_ops, 1); } return resp; } diff --git a/server/test/test_transport.c b/server/test/test_transport.c index 3761f93..80f52c2 100644 --- a/server/test/test_transport.c +++ b/server/test/test_transport.c @@ -36,12 +36,19 @@ /* --- Transport Layer Permission Tests (Negative Cases) --- */ static priskv_resp_status last_status; static uint64_t last_token; +/* Thread-local slots for concurrent tests */ +static __thread priskv_resp_status tls_status; +static __thread uint64_t tls_token; static int mock_send_response(priskv_transport_conn *conn, uint64_t request_id, priskv_resp_status status, uint32_t length, uint64_t addr_offset, uint64_t token) { + /* Keep global values for single-threaded tests */ last_status = status; last_token = token; + /* Also record into TLS for multi-threaded tests */ + tls_status = status; + tls_token = token; return 0; } @@ -166,6 +173,286 @@ static void test_kv_transport_permissions(void *kv) g_transport_driver = old_driver; } +/* --- Combined flow: Alloc + Seal (PIN), then Acquire (PIN) + Release (UNPIN) twice --- */ +/* --- Combined flow (multithreaded): Alloc + Seal (PIN), then N concurrent Acquire (PIN) + * followed by N concurrent Release (UNPIN), finally manual unpin to zero. --- */ +/* Combo worker semantics: + * Phase 1 (prefill): + * - Role A: SEAL + PIN (allocates a private node and registers an ALLOC token) + * - Role B: ACQUIRE + PIN, then RELEASE (without UNPIN) to keep the pin + * Random short wait + * Phase 2 (decode): ACQUIRE (no PIN), then RELEASE (with UNPIN) for threads that pinned in prefill + */ +typedef enum { + WORKER_PREFILL_SEAL = 0, + WORKER_PREFILL_ACQ_NO_UNPIN = 1, +} worker_role; + +typedef struct combo_worker_arg { + priskv_transport_conn *conn; + const char *key; + uint16_t keylen; + pthread_barrier_t *start_barrier; + worker_role role; + uint64_t seal_token_host; /* valid when role == WORKER_PREFILL_SEAL */ + priskv_resp_status status_prefill; + priskv_resp_status status_decode_acq; + priskv_resp_status status_decode_rel; + int prefill_pinned; /* whether prefill phase left a pin */ +} combo_worker_arg; + +static void build_and_submit_req(priskv_transport_conn *conn, uint16_t cmd, uint32_t flags, + const void *payload, uint16_t payload_len) +{ + size_t req_size = sizeof(priskv_request) + payload_len; + uint8_t req_buf[req_size]; + priskv_request *req = (priskv_request *)req_buf; + memset(req, 0, req_size); + req->command = htobe16(cmd); + req->flags = htobe32(flags); + req->nsgl = htobe16(0); + req->key_length = htobe16(payload_len); + if (payload_len && payload) + memcpy((uint8_t *)req + sizeof(priskv_request), payload, payload_len); + priskv_transport_handle_recv(conn, req, (uint16_t)req_size); +} + +/* Helper: ACQUIRE + PIN with bounded retries to tolerate publish windows */ +/* Helper: ACQUIRE with flags and bounded retries; returns status and token */ +static void acquire_with_retry_flags(priskv_transport_conn *conn, const char *key, uint16_t keylen, + uint32_t flags, + priskv_resp_status *status_out, uint64_t *token_out, + int max_retries, useconds_t retry_us, int *attempts_out) +{ + priskv_resp_status st = PRISKV_RESP_STATUS_SERVER_ERROR; + uint64_t tok = 0; + int attempt = 0; + for (attempt = 0; attempt < max_retries; attempt++) { + build_and_submit_req(conn, PRISKV_COMMAND_ACQUIRE, flags, + key, keylen); + st = tls_status; + tok = tls_token; + if (st == PRISKV_RESP_STATUS_OK && tok != 0) break; + usleep(retry_us); + } + if (status_out) *status_out = st; + if (token_out) *token_out = tok; + if (attempts_out) *attempts_out = attempt; + if (!(st == PRISKV_RESP_STATUS_OK && tok != 0)) { + printf("TEST TRANSPORT: ACQUIRE(flags=0x%x) failed after %d retries (status=%s, token=%lu)\n", + flags, + attempt, priskv_resp_status_str(st), tok); + } +} + +static void *combo_worker_thread(void *arg) +{ + combo_worker_arg *a = (combo_worker_arg *)arg; + pthread_barrier_wait(a->start_barrier); + + /* Prefill phase */ + if (a->role == WORKER_PREFILL_SEAL) { + /* In-thread: alloc private node -> register ALLOC token -> SEAL + PIN */ + uint8_t *val_ptr = NULL; + void *node = NULL; + int s = priskv_alloc_node_private(a->conn->kv, (uint8_t *)a->key, a->keylen, + &val_ptr, 128, PRISKV_KEY_MAX_TIMEOUT, &node); + if (s == PRISKV_RESP_STATUS_OK && node) { + uint64_t token = priskv_transport_token_add(a->conn, node, PRISKV_TOKEN_TYPE_ALLOC); + if (token) { + uint64_t be_token = htobe64(token); + build_and_submit_req(a->conn, PRISKV_COMMAND_SEAL, PRISKV_REQ_FLAG_PIN_ON_SEAL, + &be_token, sizeof(uint64_t)); + a->status_prefill = tls_status; + a->prefill_pinned = (tls_status == PRISKV_RESP_STATUS_OK) ? 1 : 0; + } else { + a->status_prefill = PRISKV_RESP_STATUS_SERVER_ERROR; + a->prefill_pinned = 0; + priskv_get_key_end(node); + } + } else { + a->status_prefill = s; + a->prefill_pinned = 0; + } + } else { /* WORKER_PREFILL_ACQ_NO_UNPIN */ + a->prefill_pinned = 0; + priskv_resp_status st; + uint64_t tok; + int attempts; + acquire_with_retry_flags(a->conn, a->key, a->keylen, PRISKV_REQ_FLAG_PIN_ON_ACQUIRE, + &st, &tok, 5, 1000, &attempts); + a->status_prefill = st; + if (st == PRISKV_RESP_STATUS_OK && tok != 0) { + /* RELEASE without UNPIN to keep the pin */ + uint64_t be_token = htobe64(tok); + build_and_submit_req(a->conn, PRISKV_COMMAND_RELEASE, 0, &be_token, sizeof(uint64_t)); + a->prefill_pinned = 1; + } + } + + /* Random wait 0~5 ms */ + usleep((useconds_t)(random() % 5000)); + + /* Decode phase: ACQUIRE (no PIN), then RELEASE; attach UNPIN if this thread pinned in prefill */ + /* Decode ACQUIRE with retries (no PIN); only RELEASE on success */ + priskv_resp_status st_dec; + uint64_t tok_dec; + int attempts_dec; + acquire_with_retry_flags(a->conn, a->key, a->keylen, 0, + &st_dec, &tok_dec, 5, 1000, &attempts_dec); + a->status_decode_acq = st_dec; + if (st_dec == PRISKV_RESP_STATUS_OK && tok_dec != 0) { + uint64_t be_token2 = htobe64(tok_dec); + build_and_submit_req(a->conn, PRISKV_COMMAND_RELEASE, + a->prefill_pinned ? PRISKV_REQ_FLAG_UNPIN_ON_RELEASE : 0, + &be_token2, sizeof(uint64_t)); + a->status_decode_rel = tls_status; + } else { + a->status_decode_rel = st_dec; /* propagate failure */ + } + return NULL; +} + +static void test_kv_transport_alloc_seal_pin_acquire_release_unpin_combo(void *kv, int nthreads, int iters) +{ + priskv_transport_driver mock_driver = { + .name = "mock", + .send_response = mock_send_response, + .request_key_off = mock_request_key_off, + .request_key = mock_request_key, + .recv_req = mock_recv_req, + }; + priskv_transport_driver *old_driver = g_transport_driver; + g_transport_driver = &mock_driver; + + priskv_transport_conn conn = (priskv_transport_conn){0}; + conn.kv = kv; + conn.conn_cap.max_key_length = MAX_KEY_LENGTH; + conn.conn_cap.max_sgl = 8; + pthread_spin_init(&conn.lock, PTHREAD_PROCESS_PRIVATE); + + const char *key = "combo_pin_unpin_key"; + uint16_t keylen = (uint16_t)(strlen(key) + 1); + uint8_t *val_ptr = NULL; + + for (int iter = 0; iter < iters; iter++) { + printf("TEST TRANSPORT: COMBO iter %d/%d (threads=%d)\n", iter, iters, nthreads); + + /* Record stats baseline for this iteration */ + uint64_t pin_before = priskv_get_pin_ops(kv); + uint64_t unpin_before = priskv_get_unpin_ops(kv); + + /* 0) Prepare an initial published version so ACQUIRE can succeed */ + void *keynode_alloc = NULL; + int s = priskv_alloc_node_private(kv, (uint8_t *)key, keylen, &val_ptr, 128, + PRISKV_KEY_MAX_TIMEOUT, &keynode_alloc); + assert(s == PRISKV_RESP_STATUS_OK && keynode_alloc); + uint64_t alloc_token = priskv_transport_token_add(&conn, keynode_alloc, PRISKV_TOKEN_TYPE_ALLOC); + uint64_t be_token = htobe64(alloc_token); + last_status = -1; + do_mock_req_with_flags(&conn, PRISKV_COMMAND_SEAL, &be_token, sizeof(uint64_t), + PRISKV_REQ_FLAG_PIN_ON_SEAL); + assert(last_status == PRISKV_RESP_STATUS_OK); + + /* pin_count should now be 1 */ + uint32_t vlen = 0; + void *latest = NULL; + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &latest); + assert(latest); + priskv_key *kn = (priskv_key *)latest; + assert(kn->pin_count == 1); + priskv_get_key_end(latest); + + /* 1) Half of the threads perform SEAL prefill (each allocs its own private node) */ + int n_seal = nthreads / 2; + + /* 2) Launch workers: half do SEAL prefill; the other half do ACQUIRE+RELEASE (no UNPIN) prefill; then decode */ + pthread_barrier_t start_barrier; + pthread_barrier_init(&start_barrier, NULL, (unsigned int)nthreads); + pthread_t *ths = calloc((size_t)nthreads, sizeof(pthread_t)); + combo_worker_arg *wargs = calloc((size_t)nthreads, sizeof(combo_worker_arg)); + for (int i = 0; i < nthreads; i++) { + wargs[i].conn = &conn; + wargs[i].key = key; + wargs[i].keylen = keylen; + wargs[i].start_barrier = &start_barrier; + if (i < n_seal) { + wargs[i].role = WORKER_PREFILL_SEAL; + wargs[i].seal_token_host = 0; /* not used */ + } else { + wargs[i].role = WORKER_PREFILL_ACQ_NO_UNPIN; + wargs[i].seal_token_host = 0; + } + pthread_create(&ths[i], NULL, combo_worker_thread, &wargs[i]); + } + for (int i = 0; i < nthreads; i++) { + pthread_join(ths[i], NULL); + } + pthread_barrier_destroy(&start_barrier); + + /* 3) Check pin_count: after decode UNPIN for successful prefill threads, it should be 1 */ + int sum_prefill_pins = 0; + for (int i = 0; i < nthreads; i++) sum_prefill_pins += wargs[i].prefill_pinned; + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &latest); + assert(latest); + kn = (priskv_key *)latest; + if (kn->pin_count != 1u) { + printf("TEST TRANSPORT: COMBO iter %d pin_count expected %d, got %u [FAILED]\n", + iter, 1, kn->pin_count); + } + priskv_get_key_end(latest); + + // 4) Delta stats verification: + // - No PIN in decode phase + // - Prefill successful PINs: sum_prefill_pins; initial pin-on-seal counts as 1 + // => delta_pin_ops >= 1 + sum_prefill_pins + // - Decode UNPIN for successful prefill threads: sum_prefill_pins; then 1 manual cleanup UNPIN + // => delta_unpin_ops >= sum_prefill_pins + 1 + + /* Manually unpin the remaining pin to reach 0 for cleanup (should be 1 here) */ + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &latest); + assert(latest); + for (int i = 0; i < 1; i++) { + priskv_resp_status ur = priskv_key_unpin_latest(kv, latest); + assert(ur == PRISKV_RESP_STATUS_OK); + } + priskv_get_key_end(latest); + + priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &latest); + assert(latest); + kn = (priskv_key *)latest; + assert(kn->pin_count == 0); + priskv_get_key_end(latest); + + /* 5) Verify deltas and cleanup */ + uint64_t pin_after = priskv_get_pin_ops(kv); + uint64_t unpin_after = priskv_get_unpin_ops(kv); + uint64_t delta_pin = pin_after - pin_before; + uint64_t delta_unpin = unpin_after - unpin_before; + uint64_t expect_min_pin = (uint64_t)(1 + sum_prefill_pins); + uint64_t expect_min_unpin = (uint64_t)(sum_prefill_pins + 1); + if (delta_pin < expect_min_pin) { + printf("TEST TRANSPORT: COMBO iter %d delta_pin expected >= %lu, got %lu [FAILED]\n", + iter, expect_min_pin, delta_pin); + assert(0); + } + if (delta_unpin < expect_min_unpin) { + printf("TEST TRANSPORT: COMBO iter %d delta_unpin expected >= %lu, got %lu [FAILED]\n", + iter, expect_min_unpin, delta_unpin); + assert(0); + } + + priskv_delete_key(kv, (uint8_t *)key, keylen); + + free(ths); + free(wargs); + } + + priskv_transport_token_cleanup(&conn); + pthread_spin_destroy(&conn.lock); + g_transport_driver = old_driver; +} + /* --- DROP behavior & permission tests --- */ static void test_kv_transport_drop_behavior(void *kv) { @@ -533,19 +820,19 @@ static void test_kv_transport_concurrent_seal_pin(void *kv) pthread_join(th2, NULL); pthread_barrier_destroy(&barrier); - /* Verify: pin_count on latest version == 2 */ + /* Verify: pin_count on latest version == 2, and unpin using the latest handle */ uint32_t vlen = 0; void *latest = NULL; priskv_get_key(kv, (uint8_t *)key, keylen, &val_ptr, &vlen, &latest); assert(latest); priskv_key *kn = (priskv_key *)latest; assert(kn->pin_count == 2); - priskv_get_key_end(latest); - /* Two consecutive UNPINs should succeed, the third should be UNPIN_NOT_CLOSED */ - priskv_resp_status r1 = priskv_key_unpin_latest(kv, node1); - priskv_resp_status r2 = priskv_key_unpin_latest(kv, node1); - priskv_resp_status r3 = priskv_key_unpin_latest(kv, node1); + /* Hold the latest reference while unpinning to avoid stale-handle issues */ + priskv_resp_status r1 = priskv_key_unpin_latest(kv, latest); + priskv_resp_status r2 = priskv_key_unpin_latest(kv, latest); + priskv_resp_status r3 = priskv_key_unpin_latest(kv, latest); + priskv_get_key_end(latest); assert(r1 == PRISKV_RESP_STATUS_OK); assert(r2 == PRISKV_RESP_STATUS_OK); assert(r3 == PRISKV_RESP_STATUS_UNPIN_NOT_CLOSED); @@ -800,7 +1087,7 @@ static void test_kv_transport_unpin_no_such_key(void *kv) g_transport_driver = old_driver; } -int main() +int main(int argc, char **argv) { uint8_t *key_base, *value_base; void *kv; @@ -813,13 +1100,27 @@ int main() VALUE_BLOCKS, NULL /* mf_ctx */); assert(kv); - printf("TEST TRANSPORT: Running transport layer permission tests...\n"); + /* Optional args: argv[1]=combo_nthreads, argv[2]=combo_iters */ + int combo_nthreads = 8; + int combo_iters = 1; + if (argc >= 2) { + int v = atoi(argv[1]); + if (v > 0) combo_nthreads = v; + } + if (argc >= 3) { + int v = atoi(argv[2]); + if (v > 0) combo_iters = v; + } + + printf("TEST TRANSPORT: Running transport layer permission tests... (combo threads=%d iters=%d)\n", + combo_nthreads, combo_iters); test_kv_transport_permissions(kv); test_kv_transport_drop_behavior(kv); test_kv_transport_param_validation(kv); test_kv_transport_alloc_token_add_fail(kv); test_kv_transport_pin_on_seal(kv); test_kv_transport_pin_and_unpin(kv); + test_kv_transport_alloc_seal_pin_acquire_release_unpin_combo(kv, combo_nthreads, combo_iters); test_kv_transport_unpin_no_such_key(kv); test_kv_transport_concurrent_seal_pin(kv); /* High-concurrency stress: threads and iterations are configurable; use 8*200 to expose atomicity issues */