diff --git a/.bazelrc b/.bazelrc index 44ea711..ee90160 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,3 +1,7 @@ +# Import user-local overrides (not checked in). Create user.bazelrc to set +# machine-specific options, e.g. build --config=macos_arm64 +try-import %workspace%/user.bazelrc + build:asan --strip=never build:asan --copt -fsanitize=address build:asan --copt -DADDRESS_SANITIZER @@ -10,23 +14,24 @@ common --enable_platform_specific_config build --enable_platform_specific_config -# Apply this flag only when building on/for macOS +# macOS (auto-applied on all macOS hosts via --enable_platform_specific_config). +# Only arch-independent settings belong here. build:macos --macos_minimum_os=10.15 -# If builds fail with: xcrun: invalid DEVELOPER_DIR (.../CommandLineTools), missing xcrun -# fix the host (sudo xcode-select -s /Applications/Xcode.app/Contents/Developer, or -# xcode-select --install). See README "macOS: xcrun / DEVELOPER_DIR errors". -# Rust (crate_universe): on Darwin, default @platforms//host:host often does not match -# rules_rust platform config_settings, so generated target_compatible_with marks crates -# incompatible. Linux is unchanged (no --platforms in build:linux). -# Default here is Apple Silicon (aarch64). Intel macOS: add to your user .bazelrc, e.g. -# build --config=darwin_intel -# so --platforms overrides this after build:macos is applied. -build:macos --platforms=//platform/host:darwin_arm64 -# Tests must run on an execution platform compatible with --platforms (see default_test_toolchain_type). -build:macos --extra_execution_platforms=//platform/host:darwin_arm64 -build:darwin_intel --platforms=//platform/host:darwin_x86_64 -build:darwin_intel --extra_execution_platforms=//platform/host:darwin_x86_64 +# macOS architecture-specific configs. +# Rust crate_universe generates target_compatible_with constraints that require an +# explicit --platforms matching rules_rust config_settings. The default +# @local_config_platform//:host does not satisfy these, so one of the configs below +# must be selected when building Rust targets on macOS. +# +# Recommended: add one of these to your user.bazelrc (gitignored) for seamless builds: +# build --config=macos_arm64 # Apple Silicon +# build --config=macos_x86_64 # Intel +build:macos_arm64 --platforms=//platform/host:darwin_arm64 +build:macos_arm64 --extra_execution_platforms=//platform/host:darwin_arm64 + +build:macos_x86_64 --platforms=//platform/host:darwin_x86_64 +build:macos_x86_64 --extra_execution_platforms=//platform/host:darwin_x86_64 # For all builds, use C++17 build --cxxopt="-std=c++17" @@ -36,12 +41,6 @@ build --@rules_rust//rust/settings:experimental_use_cc_common_link=True build --copt="-Wextra" build --copt="-Wno-missing-field-initializers" -# For Apple Silicon -build:apple_silicon --cpu=darwin_arm64 -# Overrides build:macos --platforms for aarch64-apple-darwin (rules_rust / Rust deps). -build:apple_silicon --platforms=//platform/host:darwin_arm64 -build:apple_silicon --extra_execution_platforms=//platform/host:darwin_arm64 - # Common flags for Clang build:clang --action_env=BAZEL_COMPILER=clang build:clang --action_env=CC=clang --action_env=CXX=clang++ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b753afb..7582b71 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ jobs: - os: ubuntu-latest - os: ubuntu-24.04-arm - os: macos-latest - bazel_flags: --config=apple_silicon + bazel_flags: --config=macos_arm64 steps: - name: Checkout code diff --git a/.gitignore b/.gitignore index 1445c3a..06acfdc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ bazel-* .vscode/* build rust_client/target +user.bazelrc diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 096b90d..33f47be 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -473,7 +473,7 @@ "@@rules_rust+//crate_universe:extensions.bzl%crate": { "general": { "bzlTransitiveDigest": "VsXyRBEYrxEPjKv5xuDr8xZxVrGLapffONoRFF745UY=", - "usagesDigest": "5WDxT9K+SxYrIS9FaUwW+tJYNeHOgnCAd8yMA4TOjSE=", + "usagesDigest": "WT+9VNAI+JNu5cIQtmokGqxURKdUxR0PBqNgMUpcOXc=", "recordedInputs": [ "ENV:CARGO_BAZEL_DEBUG \\0", "ENV:CARGO_BAZEL_GENERATOR_SHA256 \\0", diff --git a/README.md b/README.md index bdf4cf0..d5f4e24 100644 --- a/README.md +++ b/README.md @@ -52,9 +52,10 @@ This uses Google's Bazel to build. You will need to download Bazel to build it. The build also needs some external libraries, but Bazel takes care of downloading them. The *.bazelrc* file contains some configuration options. -### To build on Mac Apple Silicon +### To build on Mac ``` -bazel build --config=apple_silicon ... +bazel build --config=macos_arm64 ... # Apple Silicon +bazel build --config=macos_x86_64 ... # Intel ``` ### To build on Linux @@ -377,6 +378,41 @@ public: }; ``` +### Convenience Free Functions + +For simple use cases where you don't need to reuse a client, Subspace provides +free functions that create a temporary client, perform the operation, and return +a standalone `Publisher` or `Subscriber`. The returned object keeps the +underlying client alive for its lifetime. + +```cpp +#include "client/client.h" + +// Create a publisher without managing a Client object. +auto pub_or = subspace::CreatePublisher("my_channel", + subspace::PublisherOptions{.slot_size = 1024, .num_slots = 10}); +if (!pub_or.ok()) { + // Handle error + return; +} +auto pub = std::move(*pub_or); + +// Create a subscriber without managing a Client object. +auto sub_or = subspace::CreateSubscriber("my_channel"); +if (!sub_or.ok()) { + // Handle error + return; +} +auto sub = std::move(*sub_or); +``` + +**Parameters:** +- `channel_name`: Name of the channel +- `opts`: `PublisherOptions` or `SubscriberOptions` (defaults apply) +- `server_socket` (default: `"/tmp/subspace"`): Path to the server socket +- `client_name` (default: `""`): Optional client name +- `c` (optional): Coroutine pointer for coroutine-aware mode + ## Publisher API ### Creating a Publisher diff --git a/client/BUILD.bazel b/client/BUILD.bazel index 359ec27..2a5a8d2 100644 --- a/client/BUILD.bazel +++ b/client/BUILD.bazel @@ -55,8 +55,12 @@ cc_test( srcs = ["client_test.cc"], data = [ "//server:subspace_server", - "//plugins:nop_plugin.so", - ], + ] + select({ + "//:macos_arm64": [], + "//:macos_x86_64": [], + "//:macos_default": [], + "//conditions:default": ["//plugins:nop_plugin.so"], + }), copts = [ "-Wno-missing-field-initializers", "-Wno-unused-parameter", @@ -75,7 +79,12 @@ cc_test( "@abseil-cpp//absl/status:statusor", "@googletest//:gtest", "@coroutines//:co", - ], + ] + select({ + "//:macos_arm64": ["//plugins:nop_plugin_lib"], + "//:macos_x86_64": ["//plugins:nop_plugin_lib"], + "//:macos_default": ["//plugins:nop_plugin_lib"], + "//conditions:default": [], + }), ) cc_test( diff --git a/client/client_test.cc b/client/client_test.cc index 218878c..136cf49 100644 --- a/client/client_test.cc +++ b/client/client_test.cc @@ -795,7 +795,11 @@ TEST_F(ClientTest, PublishSingleMessageAndReadWithCallback) { } TEST_F(ClientTest, PublishSingleMessageAndReadWithPlugin) { +#ifdef __APPLE__ + ASSERT_OK(Server()->LoadPlugin("NOP", "BUILTIN")); +#else ASSERT_OK(Server()->LoadPlugin("NOP", "plugins/nop_plugin.so")); +#endif subspace::Client pub_client; subspace::Client sub_client; ASSERT_OK(pub_client.Init(Socket())); @@ -4727,6 +4731,104 @@ TEST_F(ClientTest, FreeCreateSubscriberBadSocket) { ASSERT_FALSE(sub_or.ok()); } +// Separate fixture that loads the NOP plugin before the server starts, +// so that OnReady is called during Run() on the scheduler thread. +class PluginTest : public ::testing::Test { +public: + static void SetUpTestSuite() { + printf("Starting Subspace server with NOP plugin\n"); + char socket_name_template[] = "/tmp/subspaceXXXXXX"; // NOLINT + ::close(mkstemp(&socket_name_template[0])); + socket_ = &socket_name_template[0]; + + (void)pipe(server_pipe_); + + server_ = std::make_unique( + scheduler_, socket_, "", 0, 0, + /*local=*/true, server_pipe_[1], /*initial_ordinal=*/1, + /*wait_for_clients=*/true); + +#ifdef __APPLE__ + auto status = server_->LoadPlugin("NOP", "BUILTIN"); +#else + auto status = server_->LoadPlugin("NOP", "plugins/nop_plugin.so"); +#endif + if (!status.ok()) { + fprintf(stderr, "Failed to load NOP plugin: %s\n", + status.ToString().c_str()); + exit(1); + } + + server_thread_ = std::thread([]() { + absl::Status s = server_->Run(); + if (!s.ok()) { + fprintf(stderr, "Error running Subspace server: %s\n", + s.ToString().c_str()); + exit(1); + } + }); + + char buf[8]; + (void)::read(server_pipe_[0], buf, 8); + } + + static void TearDownTestSuite() { + printf("Stopping Subspace server with NOP plugin\n"); + server_->Stop(); + + char buf[8]; + (void)::read(server_pipe_[0], buf, 8); + server_thread_.join(); + server_->CleanupAfterSession(); + (void)remove(socket_.c_str()); + } + + void SetUp() override { signal(SIGPIPE, SIG_IGN); } + + static const std::string &Socket() { return socket_; } + static subspace::Server *Server() { return server_.get(); } + +private: + inline static co::CoroutineScheduler scheduler_; + inline static std::string socket_; + inline static int server_pipe_[2]; + inline static std::unique_ptr server_; + inline static std::thread server_thread_; +}; + +TEST_F(PluginTest, HeartbeatPublishes) { + subspace::Client sub_client; + ASSERT_OK(sub_client.Init(Socket())); + absl::StatusOr sub = + sub_client.CreateSubscriber("/nop/Heartbeat"); + ASSERT_OK(sub); + + constexpr int kExpectedMessages = 2; + int received = 0; + uint64_t prev_seq = 0; + while (received < kExpectedMessages) { + absl::Status wait_status = sub->Wait(std::chrono::seconds(5)); + ASSERT_OK(wait_status) << "Timed out waiting for heartbeat message " + << received + 1; + for (;;) { + absl::StatusOr msg = sub->ReadMessage(); + ASSERT_OK(msg); + if (msg->length == 0) { + break; + } + ASSERT_EQ(sizeof(uint64_t), msg->length); + uint64_t seq; + memcpy(&seq, msg->buffer, sizeof(seq)); + if (received > 0) { + EXPECT_GT(seq, prev_seq); + } + prev_seq = seq; + received++; + } + } + ASSERT_GE(received, kExpectedMessages); +} + int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); absl::ParseCommandLine(argc, argv); diff --git a/client/test_fixture.h b/client/test_fixture.h index 1243de2..e188100 100644 --- a/client/test_fixture.h +++ b/client/test_fixture.h @@ -48,9 +48,10 @@ class SubspaceTestBase : public ::testing::Test { (void)pipe(server_pipe_); - server_ = std::make_unique(scheduler_, socket_, "", 0, 0, - /*local=*/true, - server_pipe_[1]); + server_ = std::make_unique( + scheduler_, socket_, "", 0, 0, + /*local=*/true, server_pipe_[1], /*initial_ordinal=*/1, + /*wait_for_clients=*/true); server_thread_ = std::thread([]() { absl::Status s = server_->Run(); diff --git a/plugins/BUILD.bazel b/plugins/BUILD.bazel index 5cd9aec..26c2bc8 100644 --- a/plugins/BUILD.bazel +++ b/plugins/BUILD.bazel @@ -1,11 +1,12 @@ package(default_visibility = ["//visibility:public"]) -load("@rules_cc//cc:defs.bzl", "cc_binary") +load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library") cc_binary( name = "nop_plugin.so", srcs = ["nop_plugin.cc"], deps = [ + "//client:subspace_client", "//server:server", "@abseil-cpp//absl/status", "@abseil-cpp//absl/strings:str_format", @@ -13,3 +14,19 @@ cc_binary( linkstatic = False, linkshared = True, ) + +# Static library version of the NOP plugin for linking directly into binaries. +# On macOS, dlopen'd plugins get their own copy of thread-local variables +# (like co::self), breaking the coroutine scheduler. Linking the plugin +# statically avoids that by keeping a single copy of the coroutines library. +cc_library( + name = "nop_plugin_lib", + srcs = ["nop_plugin.cc"], + deps = [ + "//client:subspace_client", + "//server:server", + "@abseil-cpp//absl/status", + "@abseil-cpp//absl/strings:str_format", + ], + alwayslink = True, +) diff --git a/plugins/nop_plugin.cc b/plugins/nop_plugin.cc index 1883dd1..1829bdb 100644 --- a/plugins/nop_plugin.cc +++ b/plugins/nop_plugin.cc @@ -1,7 +1,64 @@ +#include "client/client.h" #include "server/server.h" +#include + namespace nop_plugin { +constexpr int kHeartbeatSlotSize = 256; +constexpr int kHeartbeatNumSlots = 16; +constexpr uint64_t kHeartbeatPeriodNs = 1000000000ULL; // 1 second + +void HeartbeatCoroutine(subspace::Server &server, + subspace::PluginContext *ctx) { + if (server.ShuttingDown()) { + return; + } + + subspace::Client client(co::self); + absl::Status status = client.Init(server.GetSocketName()); + if (!status.ok()) { + ctx->logger.Log(toolbelt::LogLevel::kError, + "NOP plugin: failed to init client for heartbeat: %s", + status.ToString().c_str()); + return; + } + + absl::StatusOr pub = client.CreatePublisher( + "/nop/Heartbeat", kHeartbeatSlotSize, kHeartbeatNumSlots); + if (!pub.ok()) { + ctx->logger.Log(toolbelt::LogLevel::kError, + "NOP plugin: failed to create heartbeat publisher: %s", + pub.status().ToString().c_str()); + return; + } + + uint64_t seq = 0; + for (;;) { + int fd = co::Wait(server.GetShutdownTriggerFd(), POLLIN, kHeartbeatPeriodNs); + if (fd != -1) { + break; + } + + absl::StatusOr buffer = pub->GetMessageBuffer(); + if (!buffer.ok()) { + ctx->logger.Log(toolbelt::LogLevel::kError, + "NOP plugin: failed to get heartbeat buffer: %s", + buffer.status().ToString().c_str()); + continue; + } + memcpy(*buffer, &seq, sizeof(seq)); + absl::StatusOr msg = + pub->PublishMessage(sizeof(seq)); + if (!msg.ok()) { + ctx->logger.Log(toolbelt::LogLevel::kError, + "NOP plugin: failed to publish heartbeat: %s", + msg.status().ToString().c_str()); + } + seq++; + } +} + absl::Status OnStartup(subspace::Server &s, const std::string &name, subspace::PluginContext *ctx) { ctx->logger.Log(toolbelt::LogLevel::kInfo, "NOP plugin %s started\n", @@ -10,6 +67,11 @@ absl::Status OnStartup(subspace::Server &s, const std::string &name, } void OnReady(subspace::Server &s, subspace::PluginContext *ctx) { ctx->logger.Log(toolbelt::LogLevel::kInfo, "NOP plugin ready\n"); + + s.GetScheduler().Spawn( + [&s, ctx]() { HeartbeatCoroutine(s, ctx); }, + {.name = "NOP heartbeat", + .interrupt_fd = s.GetShutdownTriggerFd()}); } void OnShutdown(subspace::PluginContext *ctx) { diff --git a/server/plugin.h b/server/plugin.h index 80cb610..7386edf 100644 --- a/server/plugin.h +++ b/server/plugin.h @@ -8,6 +8,10 @@ #include #include +namespace co { +class CoroutineScheduler; +} + namespace subspace { class Server; @@ -15,6 +19,7 @@ struct PluginContext { PluginContext(const std::string &name) : logger(name) {} virtual ~PluginContext() = default; toolbelt::Logger logger; + co::CoroutineScheduler *scheduler = nullptr; }; // Plugins allow an externally loaded module to handle occurences in the @@ -76,6 +81,12 @@ class PluginInterface { functions_.onRemoveSubscriber(s, channel_name, subscriber_id, ctx_.get()); } + void SetScheduler(co::CoroutineScheduler &scheduler) { + if (ctx_) { + ctx_->scheduler = &scheduler; + } + } + private: PluginInterfaceFunctions functions_; std::unique_ptr ctx_; diff --git a/server/server.cc b/server/server.cc index 37f51ae..6a2fca9 100644 --- a/server/server.cc +++ b/server/server.cc @@ -186,7 +186,17 @@ void Server::CloseHandler(ClientHandler *handler) { // UDS and spawns a handler coroutine to handle the communication with // the client. void Server::ListenerCoroutine(toolbelt::UnixSocket &listen_socket) { - while (!shutting_down_) { + for (;;) { + if (!running_) { + // Keep this running until all other coroutines have completed. + // This is to make sure that other coroutines that have publisher + // and subscribers are able to connect to the server and delete them + // on shutdown. + auto strings = scheduler_.AllCoroutineStrings(); + if (strings.size() == 1) { + break; + } + } absl::Status status = HandleIncomingConnection(listen_socket); if (!status.ok()) { logger_.Log(toolbelt::LogLevel::kError, @@ -1895,7 +1905,8 @@ absl::Status Server::LoadPlugin(const std::string &name, std::lock_guard lock(plugin_lock_); void *handle = nullptr; - if (path != "BUILTIN") { + bool builtin = (path == "BUILTIN"); + if (!builtin) { handle = dlopen(path.c_str(), RTLD_LAZY); if (handle == nullptr) { return absl::InternalError( @@ -1904,7 +1915,7 @@ absl::Status Server::LoadPlugin(const std::string &name, } // Form the name of the init function and find it in the shared object. std::string interfaceFunc = absl::StrFormat("%s_Create", name); - void *func = dlsym(handle, interfaceFunc.c_str()); + void *func = dlsym(builtin ? RTLD_DEFAULT : handle, interfaceFunc.c_str()); if (func == nullptr) { return absl::InternalError( absl::StrFormat("Can't find plugin initialization symbol %s: %s", @@ -1920,6 +1931,7 @@ absl::Status Server::LoadPlugin(const std::string &name, if (!status.ok()) { return status; } + interface->SetScheduler(scheduler_); plugins_.push_back(std::make_unique( name, handle, std::unique_ptr(interface))); return absl::OkStatus();