Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,8 @@ static void zc_alloc_cb(uint64_t rid, priskv_status status, void *result)
zctx->pctx->job->mm->memcpy((void *)region->addr, zctx->value, copy_len);
zctx->token = region->token;
zctx->pctx->job->last_stage = "SEAL";
priskv_seal_async(zctx->pctx->client, &zctx->token, false /* pin_on_seal */, (uint64_t)zctx,
zc_seal_cb);
priskv_seal_async(zctx->pctx->client, &zctx->token, false /* pin_on_seal */, 0 /* pin_ttl_ms */,
(uint64_t)zctx, zc_seal_cb);
}

/* ZeroCopy DROP callback is no longer used (published keys use DELETE semantics) */
Expand Down
40 changes: 23 additions & 17 deletions client/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ static void set_handler_base(client_context *ctx, char *args, bool alloc)
printf("ALLOC_SET status(%d): %s, addr %p, length %u, token 0x%lx\n", status,
priskv_status_str(status), (void *)region.addr, region.length, region.token);
memcpy((void *)region.addr, value, (size_t)region.length);
status = priskv_seal(ctx->client, &region.token, pin_on_seal);
status = priskv_seal(ctx->client, &region.token, pin_on_seal, 0 /* pin_ttl_ms */);
if (status != PRISKV_STATUS_OK) {
printf("Failed to SEAL, status(%d): %s\n", status, priskv_status_str(status));
return;
Expand Down Expand Up @@ -275,7 +275,7 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire)
priskv_memory_region region = {0};
printf("ACQUIRE key=%s\n", key);
/* Do not pin on acquire by default from CLI */
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false, &region);
status = priskv_acquire(ctx->client, key, false, 0 /* pin_ttl_ms */, &region);
if (status != PRISKV_STATUS_OK) {
printf("Failed to GET, status(%d): %s\n", status, priskv_status_str(status));
return;
Expand Down Expand Up @@ -392,6 +392,7 @@ static void seal_token_handler(client_context *ctx, char *args)
uint64_t token = 0;
priskv_status status;
bool pin_on_seal = false;
uint64_t pin_ttl_ms = 0; /* default: server-side TTL */

tokstr = strtok_r(args, " ", &args);
if (!tokstr) {
Expand All @@ -414,33 +415,36 @@ static void seal_token_handler(client_context *ctx, char *args)
if (!strcmp(flag, "PIN")) {
pin_on_seal = true;
} else if (!strcmp(flag, "TTL")) {
/* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */
char *ttl = strtok_r(args, " ", &args);
if (!ttl || !strlen(ttl)) {
printf("%s\n", invalid_args_msg);
return;
}
errno = 0;
pin_ttl_ms = strtoull(ttl, &str_end, 10);
if (errno > 0 || str_end == ttl || *str_end != '\0') {
printf("%s\n", invalid_args_msg);
return;
}
} else {
printf("%s\n", invalid_args_msg);
return;
}
}

printf("SEAL token=%" PRIu64 " [PIN=%d]\n", token, pin_on_seal);
/* TODO(wangyi): Support per-command TTL for pin-on-seal (e.g., 'PIN TTL N')
* - Parse TTL value and pass through protocol once pin_ttl_ms is supported.
* - Default to server-side TTL when not provided.
*/
status = priskv_seal(ctx->client, &token, pin_on_seal);
printf("SEAL token=%" PRIu64 " [PIN=%d, TTL(ms)=%" PRIu64 "]\n", token, pin_on_seal,
pin_ttl_ms);
status = priskv_seal(ctx->client, &token, pin_on_seal, pin_ttl_ms);
printf("SEAL status(%d): %s\n", status, priskv_status_str(status));
}

static void acquire_only_handler(client_context *ctx, char *args)
{
char *key;
char *key, *str_end;
priskv_status status;
priskv_memory_region region = {0};
bool pin_on_acquire = false;
uint64_t pin_ttl_ms = 0; /* default: server-side TTL */

key = strtok_r(args, " ", &args);
if (!key) {
Expand All @@ -454,24 +458,26 @@ static void acquire_only_handler(client_context *ctx, char *args)
if (!strcmp(flag, "PIN")) {
pin_on_acquire = true;
} else if (!strcmp(flag, "TTL")) {
/* TODO(wangyi): TTL passthrough is not implemented; parse and discard for now */
char *ttl = strtok_r(args, " ", &args);
if (!ttl || !strlen(ttl)) {
printf("%s\n", invalid_args_msg);
return;
}
errno = 0;
pin_ttl_ms = strtoull(ttl, &str_end, 10);
if (errno > 0 || str_end == ttl || *str_end != '\0') {
printf("%s\n", invalid_args_msg);
return;
}
} else {
printf("%s\n", invalid_args_msg);
return;
}
}

/* Align output field name with CLI flag semantics */
printf("ACQUIRE key=%s [PIN=%d]\n", key, pin_on_acquire);
/* TODO(wangyi): Support per-command TTL for pin-on-acquire (e.g., 'PIN TTL N')
* - Parse TTL value and pass through protocol once pin_ttl_ms is supported.
*/
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, pin_on_acquire, &region);
printf("ACQUIRE key=%s [PIN=%d, TTL(ms)=%" PRIu64 "]\n", key, pin_on_acquire, pin_ttl_ms);
status = priskv_acquire(ctx->client, key, pin_on_acquire, pin_ttl_ms, &region);
printf("ACQUIRE status(%d): %s, addr %p, length %u, token 0x%lx\n", status,
priskv_status_str(status), (void *)region.addr, region.length, region.token);
if (status == PRISKV_STATUS_OK) {
Expand Down Expand Up @@ -545,7 +551,7 @@ static void drop_token_handler(client_context *ctx, char *args)
return;
}
}
printf("DROP token=% \n" PRIu64, token);
printf("DROP token=%" PRIu64 "\n", token);
status = priskv_drop(ctx->client, &token);
printf("DROP status(%d): %s\n", status, priskv_status_str(status));
}
Expand Down
22 changes: 12 additions & 10 deletions client/priskv.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,24 @@ int priskv_flush_async(priskv_client *client, const char *regex, uint64_t reques
int priskv_alloc_async(priskv_client *client, const char *key, uint32_t alloc_length,
uint64_t timeout, uint64_t request_id, priskv_generic_cb cb);

/* Seal memory region alloced for write (by token pointer, reuse key field) */
/* Seal memory region alloced for write (by token pointer, reuse key field)
* When pin_on_seal is true, pass TTL via pin_ttl_ms (0 uses server default).
*/
int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_seal,
uint64_t request_id, priskv_generic_cb cb);
uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb);

/* Acquire memory region for zero copy read */
int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout,
bool pin_on_acquire, uint64_t request_id, priskv_generic_cb cb);
/* Acquire memory region for zero copy read (pin_ttl_ms is PIN TTL in ms; 0 uses server default) */
int priskv_acquire_async(priskv_client *client, const char *key, bool pin_on_acquire,
uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb);

/* Release memory region (by token pointer, reuse key field) */
int priskv_release_async(priskv_client *client, const uint64_t *token, bool unpin_on_release,
uint64_t request_id, priskv_generic_cb cb);


/* Drop memory region (by token pointer, reuse key field) */
int priskv_drop_async(priskv_client *client, const uint64_t *token, uint64_t request_id,
priskv_generic_cb cb);


/* for *KEYS* command */
typedef struct priskv_key {
char *key;
Expand Down Expand Up @@ -221,10 +221,12 @@ uint64_t priskv_capacity(priskv_client *client);

int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length, uint64_t timeout,
priskv_memory_region *region);
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal);
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal,
uint64_t pin_ttl_ms);

int priskv_acquire(priskv_client *client, const char *key, bool pin_on_acquire, uint64_t pin_ttl_ms,
priskv_memory_region *region);

int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
bool pin_on_acquire, priskv_memory_region *region);
int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release);

int priskv_drop(priskv_client *client, const uint64_t *token);
Expand Down
26 changes: 15 additions & 11 deletions client/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ static void priskv_common_sync_cb(uint64_t request_id, priskv_status status, voi
priskv_transport_req_sync *req_sync = (priskv_transport_req_sync *)request_id;
uint32_t valuelen = result ? *(uint32_t *)result : 0;

priskv_log_debug("priskv_common_sync_cb: callback request_id 0x%lx, status: %s[0x%x], length %d\n", request_id,
priskv_resp_status_str(status), status, valuelen);
priskv_log_debug(
"priskv_common_sync_cb: callback request_id 0x%lx, status: %s[0x%x], length %d\n",
request_id, priskv_resp_status_str(status), status, valuelen);
req_sync->status = status;
req_sync->valuelen = valuelen;
req_sync->done = true;
Expand All @@ -61,7 +62,8 @@ static inline int priskv_sync_wait(priskv_client *client, bool *done)
return 0;
}

int priskv_get(priskv_client *client, const char *key, priskv_sgl *sgl, uint16_t nsgl, uint32_t *valuelen)
int priskv_get(priskv_client *client, const char *key, priskv_sgl *sgl, uint16_t nsgl,
uint32_t *valuelen)
{
priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false};

Expand All @@ -72,7 +74,8 @@ int priskv_get(priskv_client *client, const char *key, priskv_sgl *sgl, uint16_t
return req_sync.status;
}

int priskv_set(priskv_client *client, const char *key, priskv_sgl *sgl, uint16_t nsgl, uint64_t timeout)
int priskv_set(priskv_client *client, const char *key, priskv_sgl *sgl, uint16_t nsgl,
uint64_t timeout)
{
priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false};

Expand Down Expand Up @@ -177,21 +180,22 @@ int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length,
return req_sync.status;
}

int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal)
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal, uint64_t pin_ttl_ms)
{
priskv_transport_req_sync req_sync = {.status = 0xffff, .done = false};
priskv_seal_async(client, token, pin_on_seal, (uint64_t)&req_sync,
priskv_seal_async(client, token, pin_on_seal, pin_ttl_ms, (uint64_t)&req_sync,
priskv_common_sync_cb);

priskv_sync_wait(client, &req_sync.done);
return req_sync.status;
}

int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
bool pin_on_acquire, priskv_memory_region *region)
int priskv_acquire(priskv_client *client, const char *key, bool pin_on_acquire, uint64_t pin_ttl_ms,
priskv_memory_region *region)
{
priskv_transport_zero_copy_req_sync req_sync = {.status = 0xffff, .done = false};
priskv_acquire_async(client, key, timeout, pin_on_acquire, (uint64_t)&req_sync,
/* pin_ttl_ms is per-request PIN TTL in ms; 0 uses server default */
priskv_acquire_async(client, key, pin_on_acquire, pin_ttl_ms, (uint64_t)&req_sync,
priskv_zero_copy_req_sync_cb);

priskv_sync_wait(client, &req_sync.done);
Expand Down Expand Up @@ -232,8 +236,8 @@ static void priskv_keys_sync_cb(uint64_t request_id, priskv_status status, void
{
priskv_transport_keys_sync *keys_req_sync = (priskv_transport_keys_sync *)request_id;

priskv_log_debug("priskv_keys_sync_cb: callback request_id 0x%lx, status: %s[0x%x]\n", request_id,
priskv_resp_status_str(status), status);
priskv_log_debug("priskv_keys_sync_cb: callback request_id 0x%lx, status: %s[0x%x]\n",
request_id, priskv_resp_status_str(status), status);
keys_req_sync->status = status;
keys_req_sync->done = true;

Expand Down
2 changes: 2 additions & 0 deletions client/transport/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ priskv_rdma_req_new(priskv_client *client, priskv_transport_conn *conn, uint64_t
rdma_req->main_wq = client->wq;
rdma_req->cmd = cmd;
rdma_req->timeout = timeout;
rdma_req->pin_ttl_ms = 0; /* default: not set */
rdma_req->key = strdup(key);
rdma_req->keylen = keylen;
rdma_req->request_id = request_id;
Expand Down Expand Up @@ -1256,6 +1257,7 @@ static int priskv_rdma_req_send(void *arg)
req->command = htobe16(rdma_req->cmd);
req->nsgl = htobe16(rdma_req->nsgl);
req->timeout = htobe64(rdma_req->timeout);
req->pin_ttl_ms = htobe64(rdma_req->pin_ttl_ms);
req->flags = htobe32(rdma_req->req_flags);
req->key_length = htobe16(rdma_req->keylen);

Expand Down
Loading
Loading