Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pypriskv/priskv.egg-info/
pypriskv/priskv/*.so
include/priskv-version.h
pypriskv/priskv/__pycache__/
pypriskv/**/__pycache__/
**/__pycache__/
*.pyc
man/priskv-server.1
.vscode/
tmp/
Expand Down
193 changes: 193 additions & 0 deletions SETUP_UCX_TCP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# PrisKV: From Zero to Working (UCX TCP + Python client)

This guide targets developer environments without RDMA hardware and uses UCX over TCP for connectivity.
The goal is:
1. Verify `import priskv` works (Python client is installed)
2. Connect using `PriskvClient` to a running `priskv-server`
3. Optionally validate end-to-end `set/get`

---

## 0. Prerequisites (Debian/Ubuntu)

### System dependencies

```bash
apt-get update
apt-get install -y \
git gcc make cmake \
librdmacm-dev rdma-core libibverbs-dev \
libncurses5-dev libmount-dev libevent-dev libssl-dev \
dpkg-dev debhelper \
pkg-config \
python3-pybind11 python3-dev python3-pip \
libonig-dev libhiredis-dev liburing-dev \
libucx-dev libucx0
```

### Python build tooling

```bash
python3 -m venv .venv
source .venv/bin/activate
pip3 install pybind11 yapf==0.32.0
```

If you are running as `root` inside a container, `sudo` is usually not required.

---

## 1. Prepare the source tree

```bash
cd /PrisKV
```

If the repository uses submodules (e.g., `json-c`), initialize them:

```bash
git submodule update --init --recursive
```

---

## 2. Build `priskv-server` (C/C++)

Recommended: build everything once, then rebuild server if needed:

```bash
cd /PrisKV
make
```

Or build only the server (preferred if build without RDMA):

```bash
cd /PrisKV
make server
```

---

## 3. Build and install the Python client (`pypriskv`)

If you use a virtual environment (recommended), activate it first:

```bash
source .venv/bin/activate
pip install -U pip setuptools wheel
```

### Option A: Editable install (recommended for development)

```bash
cd /PrisKV
make all
cd pypriskv
pip install --no-build-isolation -v -e .
```

### Option B: Install from wheel

```bash
cd /PrisKV
make all
cd pypriskv
python3 setup.py build_ext bdist_wheel
pip install ./dist/*.whl
```

---

## 4. Configure runtime environment variables for UCX TCP

Make sure both server and client use the same transport configuration:

- `PRISKV_TRANSPORT=ucx`
- `UCX_TLS=tcp`
- If using direct mode (no Redis meta service): `PRISKV_CLIENT_DIRECT_MODE=y`

Example:

```bash
export PRISKV_TRANSPORT=ucx
export UCX_TLS=tcp
export PRISKV_CLIENT_DIRECT_MODE=y
```

Optional debugging:

```bash
export PRISKV_LOG_LEVEL=debug
```

---

## 5. Start `priskv-server` (UCX TCP)

```bash
source .venv/bin/activate
cd /PrisKV
export PRISKV_TRANSPORT=ucx
export UCX_TLS=tcp
export PRISKV_CLIENT_DIRECT_MODE=y
export PRISKV_USE_SHM=n

./server/priskv-server -a 127.0.0.1 -p 6379 --acl any
```

Expected server log includes:
- `UCX: <...> ready`

---

## 6. Verify connectivity from Python

Activate your venv:

```bash
source .venv/bin/activate
```

Then run a minimal connectivity check:

```bash
export PRISKV_TRANSPORT=ucx
export UCX_TLS=tcp
export PRISKV_CLIENT_DIRECT_MODE=y

python - <<'PY'
import priskv

c = priskv.PriskvClient("127.0.0.1", 6379, "kvcache-redis")
print("connected")
c.close()
print("closed")
PY
```

Optional: validate `set/get`:

```bash
python - <<'PY'
import priskv

c = priskv.PriskvClient("127.0.0.1", 6379, "kvcache-redis")
print("setstr:", c.setstr("k1", "v1", 2000))
print("getstr:", c.getstr("k1"))
c.close()
PY
```

---

## 7. Readiness checklist for developers

- Successful `import priskv` indicates the Python extension is built/installed correctly.
- Server logs show `UCX ... ready` indicates UCX transport is initialized.
- Client logs show `established` indicates handshake/connection succeeded.
- If `set/get` hangs, investigate the end-to-end request/response path:
- server response generation
- client response receive callback
- protocol struct decoding and completion-info handling

9 changes: 8 additions & 1 deletion client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ VALKEY_LDFLAGS = -I$(VALKEY_INCLUDE_PATH)
PRISKV_TARGETS_ALL = priskv-client priskv-benchmark priskv-example priskv-test_runtime
PRISKV_TARGETS = priskv-client priskv-benchmark
PRISKV_TARGETS_SRCS = $(patsubst priskv-%, %.c, $(PRISKV_TARGETS_ALL))
SRCS = $(filter-out $(PRISKV_TARGETS_SRCS) $(VALKEY_BENCHMARK_SRC), $(wildcard *.c transport/*.c))

ifneq (,$(filter $(WITH_RDMA),yes YES y Y 1))
CFLAGS += -DWITH_RDMA
else
RDMA_EXCLUDE = rdma.c transport/rdma.c
endif

SRCS = $(filter-out $(PRISKV_TARGETS_SRCS) $(VALKEY_BENCHMARK_SRC) $(RDMA_EXCLUDE), $(wildcard *.c transport/*.c))
OBJS := $(SRCS:%.c=%.o)
DEPS := $(OBJS:%.o=%.d)

Expand Down
5 changes: 3 additions & 2 deletions client/benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,8 +996,9 @@ static void priskv_drv_get(void *ctx, const char *key, void *value, uint32_t val
zctx->value = value;
zctx->value_len = value_len;
priskv_ctx->job->last_stage = "ACQUIRE";
priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false /* pin_on_acquire */,
(uint64_t)zctx, zc_acquire_cb);
priskv_acquire_async(priskv_ctx->client, key, PRISKV_KEY_MAX_TIMEOUT,
false /* pin_on_acquire */, 0 /* pin_ttl_ms */, (uint64_t)zctx,
zc_acquire_cb);
return;
}
/* Remove duplicate ZeroCopy GET branch */
Expand Down
6 changes: 4 additions & 2 deletions client/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ static void get_handler_base(client_context *ctx, char *args, bool acquire)
priskv_memory_region region = {0};
printf("ACQUIRE key=%s\n", key);
/* Do not pin on acquire by default from CLI */
status = priskv_acquire(ctx->client, key, false, 0 /* pin_ttl_ms */, &region);
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, false, 0 /* pin_ttl_ms */,
&region);
if (status != PRISKV_STATUS_OK) {
printf("Failed to GET, status(%d): %s\n", status, priskv_status_str(status));
return;
Expand Down Expand Up @@ -477,7 +478,8 @@ static void acquire_only_handler(client_context *ctx, char *args)

/* Align output field name with CLI flag semantics */
printf("ACQUIRE key=%s [PIN=%d, TTL(ms)=%" PRIu64 "]\n", key, pin_on_acquire, pin_ttl_ms);
status = priskv_acquire(ctx->client, key, pin_on_acquire, pin_ttl_ms, &region);
status = priskv_acquire(ctx->client, key, PRISKV_KEY_MAX_TIMEOUT, pin_on_acquire, pin_ttl_ms,
&region);
printf("ACQUIRE status(%d): %s, addr %p, length %u, token 0x%lx\n", status,
priskv_status_str(status), (void *)region.addr, region.length, region.token);
if (status == PRISKV_STATUS_OK) {
Expand Down
12 changes: 6 additions & 6 deletions client/priskv.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ int priskv_alloc_async(priskv_client *client, const char *key, uint32_t alloc_le
int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_seal,
uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb);

/* Acquire memory region for zero copy read (pin_ttl_ms is PIN TTL in ms; 0 uses server default) */
int priskv_acquire_async(priskv_client *client, const char *key, bool pin_on_acquire,
uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb);
/* Acquire: @timeout is key/op timeout for transport; @pin_ttl_ms is PIN TTL (0 = server default). */
int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout,
bool pin_on_acquire, uint64_t pin_ttl_ms, uint64_t request_id,
priskv_generic_cb cb);

/* Release memory region (by token pointer, reuse key field) */
int priskv_release_async(priskv_client *client, const uint64_t *token, bool unpin_on_release,
Expand Down Expand Up @@ -224,9 +225,8 @@ int priskv_alloc(priskv_client *client, const char *key, uint32_t alloc_length,
int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal,
uint64_t pin_ttl_ms);

int priskv_acquire(priskv_client *client, const char *key, bool pin_on_acquire, uint64_t pin_ttl_ms,
priskv_memory_region *region);

int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
bool pin_on_acquire, uint64_t pin_ttl_ms, priskv_memory_region *region);
int priskv_release(priskv_client *client, const uint64_t *token, bool unpin_on_release);

int priskv_drop(priskv_client *client, const uint64_t *token);
Expand Down
7 changes: 3 additions & 4 deletions client/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,11 @@ int priskv_seal(priskv_client *client, const uint64_t *token, bool pin_on_seal,
return req_sync.status;
}

int priskv_acquire(priskv_client *client, const char *key, bool pin_on_acquire, uint64_t pin_ttl_ms,
priskv_memory_region *region)
int priskv_acquire(priskv_client *client, const char *key, uint64_t timeout,
bool pin_on_acquire, uint64_t pin_ttl_ms, priskv_memory_region *region)
{
priskv_transport_zero_copy_req_sync req_sync = {.status = 0xffff, .done = false};
/* pin_ttl_ms is per-request PIN TTL in ms; 0 uses server default */
priskv_acquire_async(client, key, pin_on_acquire, pin_ttl_ms, (uint64_t)&req_sync,
priskv_acquire_async(client, key, timeout, pin_on_acquire, pin_ttl_ms, (uint64_t)&req_sync,
priskv_zero_copy_req_sync_cb);

priskv_sync_wait(client, &req_sync.done);
Expand Down
15 changes: 9 additions & 6 deletions client/transport/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
priskv_transport_driver *g_client_driver = NULL;

extern priskv_transport_driver priskv_transport_driver_ucx;
#ifdef WITH_RDMA
extern priskv_transport_driver priskv_transport_driver_rdma;
#endif

static int priskv_build_check(void)
{
Expand Down Expand Up @@ -73,10 +75,12 @@ static void __attribute__((constructor)) priskv_client_transport_init(void)
driver = &priskv_transport_driver_ucx;
priskv_log_notice("Using UCX transport backend\n");
break;
#ifdef WITH_RDMA
case PRISKV_TRANSPORT_BACKEND_RDMA:
driver = &priskv_transport_driver_rdma;
priskv_log_notice("Using RDMA transport backend\n");
break;
#endif
default:
priskv_log_error("Unknown transport backend: %d\n", backend);
break;
Expand Down Expand Up @@ -429,17 +433,16 @@ int priskv_seal_async(priskv_client *client, const uint64_t *token, bool pin_on_
{
uint32_t flags = pin_on_seal ? PRISKV_REQ_FLAG_PIN_ON_SEAL : 0;
priskv_send_command(client, request_id, (const char *)token, 0 /* alloc_length */, NULL, 0,
0 /* key_expiry_timeout */, pin_ttl_ms /* pin_ttl_ms */,
PRISKV_COMMAND_SEAL, flags, cb);
0 /* key_expiry_timeout */, pin_ttl_ms, PRISKV_COMMAND_SEAL, flags, cb);
return 0;
}

int priskv_acquire_async(priskv_client *client, const char *key, bool pin_on_acquire,
uint64_t pin_ttl_ms, uint64_t request_id, priskv_generic_cb cb)
int priskv_acquire_async(priskv_client *client, const char *key, uint64_t timeout,
bool pin_on_acquire, uint64_t pin_ttl_ms, uint64_t request_id,
priskv_generic_cb cb)
{
uint32_t flags = pin_on_acquire ? PRISKV_REQ_FLAG_PIN_ON_ACQUIRE : 0;
priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0,
0 /* key_expiry_timeout */, pin_ttl_ms /* pin_ttl_ms */,
priskv_send_command(client, request_id, key, 0 /* alloc_length */, NULL, 0, timeout, pin_ttl_ms,
PRISKV_COMMAND_ACQUIRE, flags, cb);
return 0;
}
Expand Down
Loading
Loading