Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/connect/src/inngest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

logger = structlog.get_logger()

inngest_client = inngest.Inngest(app_id="fast_api_example", logger=logger)
inngest_client = inngest.Inngest(app_id="connect_example", logger=logger)
33 changes: 33 additions & 0 deletions pkg/inngest/inngest/_internal/client_lib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,39 @@ def _get_sync(self, url: str) -> types.MaybeError[httpx.Response]:

return res

async def _post(
self, url: str, body: object
) -> types.MaybeError[httpx.Response]:
"""
Perform an asynchronous HTTP POST request. Handles authn
"""
req = self._http_client.build_request(
"POST",
url,
headers=net.create_headers(
env=self._env,
framework=None,
server_kind=None,
),
json=body,
timeout=self._httpx_timeout,
)

res = await net.fetch_with_auth_fallback(
self._http_client,
self._http_client_sync,
req,
signing_key=self._signing_key,
signing_key_fallback=self._signing_key_fallback,
)
if isinstance(res, Exception):
return res

if res.status_code >= 400:
return Exception(f"HTTP error: {res.status_code} {res.text}")

return res

async def _get_batch(
self, run_id: str
) -> types.MaybeError[list[server_lib.Event]]:
Expand Down
2 changes: 2 additions & 0 deletions pkg/inngest/inngest/_internal/comm_lib/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async def post(
middleware,
step_lib.StepIDCounter(),
params.step_id,
request.ctx.run_id,
),
),
params.fn_id,
Expand Down Expand Up @@ -224,6 +225,7 @@ async def post(
middleware,
step_lib.StepIDCounter(),
params.step_id,
request.ctx.run_id,
),
),
params.fn_id,
Expand Down
2 changes: 2 additions & 0 deletions pkg/inngest/inngest/_internal/step_lib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ def __init__(
middleware: middleware_lib.MiddlewareManager,
step_id_counter: StepIDCounter,
target_hashed_id: typing.Optional[str],
run_id: str,
) -> None:
self._client = client
self._middleware = middleware
self._step_id_counter = step_id_counter
self._target_hashed_id = target_hashed_id
self._run_id = run_id

def _handle_skip(
self,
Expand Down
2 changes: 2 additions & 0 deletions pkg/inngest/inngest/_internal/step_lib/step_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ def __init__(
middleware: middleware_lib.MiddlewareManager,
step_id_counter: base.StepIDCounter,
target_hashed_id: typing.Optional[str],
run_id: str,
) -> None:
super().__init__(
client,
middleware,
step_id_counter,
target_hashed_id,
run_id,
)

self.ai = AI(self)
Expand Down
2 changes: 2 additions & 0 deletions pkg/inngest/inngest/_internal/step_lib/step_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ def __init__(
middleware: middleware_lib.MiddlewareManager,
step_id_counter: base.StepIDCounter,
target_hashed_id: typing.Optional[str],
run_id: str,
) -> None:
super().__init__(
client,
middleware,
step_id_counter,
target_hashed_id,
run_id,
)

self.ai = AI(self)
Expand Down
2 changes: 2 additions & 0 deletions pkg/inngest/inngest/experimental/mocked/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def trigger(
middleware,
step_lib.StepIDCounter(),
step_id,
request.ctx.run_id,
),
)

Expand Down Expand Up @@ -133,6 +134,7 @@ def trigger(
middleware,
step_lib.StepIDCounter(),
step_id,
request.ctx.run_id,
),
)

Expand Down
10 changes: 10 additions & 0 deletions pkg/inngest/inngest/experimental/realtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
A helper library for realtime publishing

This is an experimental preview support for the Python SDK.
"""

from .publish import publish
from .subscription_tokens import get_subscription_token

__all__ = ["publish", "get_subscription_token"]
45 changes: 45 additions & 0 deletions pkg/inngest/inngest/experimental/realtime/publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import typing
from urllib.parse import urlencode, urljoin

from inngest._internal import errors, step_lib


# Q - Should this be step_publish or realtime publish or otherwise to compare to publish within a step?
# TODO - Support streams
async def publish(
step: step_lib.Step,
channel: str,
topic: str,
data: typing.Mapping[str, object],
) -> None:
"""
Publish a message to a realtime channel.
This currently requires the Step object as an argument as the API is finalized.
"""

params = {
"channel": channel,
"topic": topic,
"run_id": step._run_id,
}

async def _publish_api_request() -> None:
res = await step._client._post(
url=urljoin(
step._client._api_origin,
f"/v1/realtime/publish?{urlencode(params)}",
),
body=data,
)
if isinstance(res, Exception):
raise res
if res.status_code != 200:
raise errors.Error(
"failed to publish to realtime channel",
)
return None

await step.run(f"publish:{channel}", _publish_api_request)

return None
39 changes: 39 additions & 0 deletions pkg/inngest/inngest/experimental/realtime/subscription_tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import typing
from urllib.parse import urljoin

from inngest._internal import client_lib, errors


async def get_subscription_token(
client: client_lib.Inngest, channel: str, topics: list[str]
) -> typing.Mapping[str, object]:
"""
Create a subscription token for a given channel and topics.
The token can be used by a client to subscribe to realtime events,
including front-end applications using the @inngest/realtime npm package.
"""
data = []
for topic in topics:
data.append(
{
"channel": channel,
"topic": topic,
"kind": "run",
}
)

res = await client._post(
url=urljoin(
client._api_origin,
"/v1/realtime/token",
),
body=data,
)
if isinstance(res, Exception):
raise res
if res.status_code >= 300:
raise errors.Error(
"failed to get subscription token",
)
# Response is an object with a "jwt" property which is a string
return res.json()
Loading