Fastlane JS channel events (#993)

* Fastlane JS channel events

* Abstract fastlaning to separate concerns
This commit is contained in:
Jonatan Kłosko 2022-02-09 23:15:03 +01:00 committed by GitHub
parent 19b777eb4e
commit 7f19afe7af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 37 deletions

View file

@ -1312,29 +1312,74 @@ defmodule Livebook.Session do
@doc """
Subscribes the caller to runtime messages under the given topic.
Broadcasted events are encoded using `encoder`, if successful,
the message is sent directly to `receiver_pid`, otherwise an
`{:encoding_error, error, message}` is sent to the caller.
"""
@spec subscribe_to_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()}
def subscribe_to_runtime_events(session_id, topic, subtopic) do
Phoenix.PubSub.subscribe(Livebook.PubSub, runtime_messages_topic(session_id, topic, subtopic))
@spec subscribe_to_runtime_events(
id(),
String.t(),
String.t(),
(term() -> {:ok, term()} | {:error, term()}),
pid()
) :: :ok | {:error, term()}
def subscribe_to_runtime_events(session_id, topic, subtopic, encoder, receiver_pid) do
full_topic = runtime_messages_topic(session_id, topic, subtopic)
Phoenix.PubSub.subscribe(Livebook.PubSub, full_topic, metadata: {encoder, receiver_pid})
end
@doc """
Unsubscribes the caller from runtime messages subscribed earlier
with `subscribe_to_runtime_events/3`.
with `subscribe_to_runtime_events/5`.
"""
@spec unsubscribe_from_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()}
def unsubscribe_from_runtime_events(session_id, topic, subtopic) do
Phoenix.PubSub.unsubscribe(
Livebook.PubSub,
runtime_messages_topic(session_id, topic, subtopic)
)
full_topic = runtime_messages_topic(session_id, topic, subtopic)
Phoenix.PubSub.unsubscribe(Livebook.PubSub, full_topic)
end
@doc false
def runtime_messages_topic(session_id, topic, subtopic) do
def broadcast_runtime_event(session_id, topic, subtopic, message) do
full_topic = runtime_messages_topic(session_id, topic, subtopic)
Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message, __MODULE__)
end
defp runtime_messages_topic(session_id, topic, subtopic) do
"sessions:#{session_id}:runtime_messages:#{topic}:#{subtopic}"
end
@doc false
# Custom dispatcher for broadcasting runtime events
def dispatch(subscribers, from, message) do
Enum.reduce(subscribers, %{}, fn
{pid, _}, cache when pid == from ->
cache
{pid, {encoder, receiver_pid}}, cache ->
case cache do
%{^encoder => encoded_message} ->
send(receiver_pid, encoded_message)
cache
%{} ->
case encoder.(message) do
{:ok, encoded_message} ->
send(receiver_pid, encoded_message)
Map.put(cache, encoder, encoded_message)
{:error, error} ->
send(pid, {:encoding_error, error, message})
cache
end
end
{pid, _}, cache ->
send(pid, message)
cache
end)
end
@doc """
Determines locator of the evaluation that the given
cell depends on.

View file

@ -21,8 +21,7 @@ defmodule Livebook.Session.Worker do
@impl true
def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do
full_topic = Livebook.Session.runtime_messages_topic(state.session_id, topic, subtopic)
Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message)
Livebook.Session.broadcast_runtime_event(state.session_id, topic, subtopic, message)
{:noreply, state}
end
end

View file

@ -18,7 +18,13 @@ defmodule LivebookWeb.JSOutputChannel do
socket = assign(socket, ref_with_pid: ref_with_pid, ref_with_count: ref_with_count)
if socket.assigns.ref_with_count[ref] == 1 do
Livebook.Session.subscribe_to_runtime_events(socket.assigns.session_id, "js_live", ref)
Livebook.Session.subscribe_to_runtime_events(
socket.assigns.session_id,
"js_live",
ref,
&fastlane_encoder/1,
socket.transport_pid
)
end
{:noreply, socket}
@ -52,8 +58,8 @@ defmodule LivebookWeb.JSOutputChannel do
end
@impl true
def handle_info({:connect_reply, data, %{ref: ref}}, socket) do
with {:error, error} <- try_push(socket, "init:#{ref}", nil, data) do
def handle_info({:connect_reply, payload, %{ref: ref}}, socket) do
with {:error, error} <- try_push(socket, "init:#{ref}", nil, payload) do
message = "Failed to serialize initial widget data, " <> error
push(socket, "error:#{ref}", %{"message" => message})
end
@ -61,20 +67,24 @@ defmodule LivebookWeb.JSOutputChannel do
{:noreply, socket}
end
def handle_info({:event, event, payload, %{ref: ref}}, socket) do
with {:error, error} <- try_push(socket, "event:#{ref}", [event], payload) do
message = "Failed to serialize event payload, " <> error
push(socket, "error:#{ref}", %{"message" => message})
end
def handle_info({:encoding_error, error, {:event, _event, _payload, %{ref: ref}}}, socket) do
message = "Failed to serialize widget data, " <> error
push(socket, "error:#{ref}", %{"message" => message})
{:noreply, socket}
end
# In case the payload fails to encode we catch the error
defp try_push(socket, event, meta, payload) do
with {:ok, _} <-
run_safely(fn ->
push(socket, event, transport_encode!(meta, payload))
end),
do: :ok
end
# In case the payload fails to encode we catch the error
defp run_safely(fun) do
try do
payload = transport_encode!(meta, payload)
push(socket, event, payload)
{:ok, fun.()}
catch
:error, %Protocol.UndefinedError{protocol: Jason.Encoder, value: value} ->
{:error, "value #{inspect(value)} is not JSON-serializable, use another data type"}
@ -84,6 +94,16 @@ defmodule LivebookWeb.JSOutputChannel do
end
end
defp fastlane_encoder({:event, event, payload, %{ref: ref}}) do
run_safely(fn ->
Phoenix.Socket.V2.JSONSerializer.fastlane!(%Phoenix.Socket.Broadcast{
topic: "js_output",
event: "event:#{ref}",
payload: transport_encode!([event], payload)
})
end)
end
# A user payload can be either a JSON-serializable term
# or a {:binary, info, binary} tuple, where info is a
# JSON-serializable term. The channel allows for sending

View file

@ -23,12 +23,6 @@ defmodule LivebookWeb.JSOutputChannelTest do
assert_push "init:1", %{"root" => [nil, [1, 2, 3]]}
end
test "sends events received from widget server to the client", %{socket: socket} do
send(socket.channel_pid, {:event, "ping", [1, 2, 3], %{ref: "1"}})
assert_push "event:1", %{"root" => [["ping"], [1, 2, 3]]}
end
test "sends client events to the corresponding widget server", %{socket: socket} do
push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"})
@ -51,14 +45,6 @@ defmodule LivebookWeb.JSOutputChannelTest do
assert_push "init:1", {:binary, <<24::size(32), "[null,{\"message\":\"hey\"}]", 1, 2, 3>>}
end
test "from server to client", %{socket: socket} do
payload = {:binary, %{message: "hey"}, <<1, 2, 3>>}
send(socket.channel_pid, {:event, "ping", payload, %{ref: "1"}})
assert_push "event:1",
{:binary, <<28::size(32), "[[\"ping\"],{\"message\":\"hey\"}]", 1, 2, 3>>}
end
test "form client to server", %{socket: socket} do
push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"})