diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index ba06906a0..0331650e5 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -1312,29 +1312,74 @@ defmodule Livebook.Session do @doc """ Subscribes the caller to runtime messages under the given topic. + + Broadcasted events are encoded using `encoder`, if successful, + the message is sent directly to `receiver_pid`, otherwise an + `{:encoding_error, error, message}` is sent to the caller. """ - @spec subscribe_to_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()} - def subscribe_to_runtime_events(session_id, topic, subtopic) do - Phoenix.PubSub.subscribe(Livebook.PubSub, runtime_messages_topic(session_id, topic, subtopic)) + @spec subscribe_to_runtime_events( + id(), + String.t(), + String.t(), + (term() -> {:ok, term()} | {:error, term()}), + pid() + ) :: :ok | {:error, term()} + def subscribe_to_runtime_events(session_id, topic, subtopic, encoder, receiver_pid) do + full_topic = runtime_messages_topic(session_id, topic, subtopic) + Phoenix.PubSub.subscribe(Livebook.PubSub, full_topic, metadata: {encoder, receiver_pid}) end @doc """ Unsubscribes the caller from runtime messages subscribed earlier - with `subscribe_to_runtime_events/3`. + with `subscribe_to_runtime_events/5`. """ @spec unsubscribe_from_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()} def unsubscribe_from_runtime_events(session_id, topic, subtopic) do - Phoenix.PubSub.unsubscribe( - Livebook.PubSub, - runtime_messages_topic(session_id, topic, subtopic) - ) + full_topic = runtime_messages_topic(session_id, topic, subtopic) + Phoenix.PubSub.unsubscribe(Livebook.PubSub, full_topic) end @doc false - def runtime_messages_topic(session_id, topic, subtopic) do + def broadcast_runtime_event(session_id, topic, subtopic, message) do + full_topic = runtime_messages_topic(session_id, topic, subtopic) + Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message, __MODULE__) + end + + defp runtime_messages_topic(session_id, topic, subtopic) do "sessions:#{session_id}:runtime_messages:#{topic}:#{subtopic}" end + @doc false + # Custom dispatcher for broadcasting runtime events + def dispatch(subscribers, from, message) do + Enum.reduce(subscribers, %{}, fn + {pid, _}, cache when pid == from -> + cache + + {pid, {encoder, receiver_pid}}, cache -> + case cache do + %{^encoder => encoded_message} -> + send(receiver_pid, encoded_message) + cache + + %{} -> + case encoder.(message) do + {:ok, encoded_message} -> + send(receiver_pid, encoded_message) + Map.put(cache, encoder, encoded_message) + + {:error, error} -> + send(pid, {:encoding_error, error, message}) + cache + end + end + + {pid, _}, cache -> + send(pid, message) + cache + end) + end + @doc """ Determines locator of the evaluation that the given cell depends on. diff --git a/lib/livebook/session/worker.ex b/lib/livebook/session/worker.ex index f7dcc1748..3786bffe1 100644 --- a/lib/livebook/session/worker.ex +++ b/lib/livebook/session/worker.ex @@ -21,8 +21,7 @@ defmodule Livebook.Session.Worker do @impl true def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do - full_topic = Livebook.Session.runtime_messages_topic(state.session_id, topic, subtopic) - Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message) + Livebook.Session.broadcast_runtime_event(state.session_id, topic, subtopic, message) {:noreply, state} end end diff --git a/lib/livebook_web/channels/js_output_channel.ex b/lib/livebook_web/channels/js_output_channel.ex index 867334518..81aaff3ba 100644 --- a/lib/livebook_web/channels/js_output_channel.ex +++ b/lib/livebook_web/channels/js_output_channel.ex @@ -18,7 +18,13 @@ defmodule LivebookWeb.JSOutputChannel do socket = assign(socket, ref_with_pid: ref_with_pid, ref_with_count: ref_with_count) if socket.assigns.ref_with_count[ref] == 1 do - Livebook.Session.subscribe_to_runtime_events(socket.assigns.session_id, "js_live", ref) + Livebook.Session.subscribe_to_runtime_events( + socket.assigns.session_id, + "js_live", + ref, + &fastlane_encoder/1, + socket.transport_pid + ) end {:noreply, socket} @@ -52,8 +58,8 @@ defmodule LivebookWeb.JSOutputChannel do end @impl true - def handle_info({:connect_reply, data, %{ref: ref}}, socket) do - with {:error, error} <- try_push(socket, "init:#{ref}", nil, data) do + def handle_info({:connect_reply, payload, %{ref: ref}}, socket) do + with {:error, error} <- try_push(socket, "init:#{ref}", nil, payload) do message = "Failed to serialize initial widget data, " <> error push(socket, "error:#{ref}", %{"message" => message}) end @@ -61,20 +67,24 @@ defmodule LivebookWeb.JSOutputChannel do {:noreply, socket} end - def handle_info({:event, event, payload, %{ref: ref}}, socket) do - with {:error, error} <- try_push(socket, "event:#{ref}", [event], payload) do - message = "Failed to serialize event payload, " <> error - push(socket, "error:#{ref}", %{"message" => message}) - end - + def handle_info({:encoding_error, error, {:event, _event, _payload, %{ref: ref}}}, socket) do + message = "Failed to serialize widget data, " <> error + push(socket, "error:#{ref}", %{"message" => message}) {:noreply, socket} end - # In case the payload fails to encode we catch the error defp try_push(socket, event, meta, payload) do + with {:ok, _} <- + run_safely(fn -> + push(socket, event, transport_encode!(meta, payload)) + end), + do: :ok + end + + # In case the payload fails to encode we catch the error + defp run_safely(fun) do try do - payload = transport_encode!(meta, payload) - push(socket, event, payload) + {:ok, fun.()} catch :error, %Protocol.UndefinedError{protocol: Jason.Encoder, value: value} -> {:error, "value #{inspect(value)} is not JSON-serializable, use another data type"} @@ -84,6 +94,16 @@ defmodule LivebookWeb.JSOutputChannel do end end + defp fastlane_encoder({:event, event, payload, %{ref: ref}}) do + run_safely(fn -> + Phoenix.Socket.V2.JSONSerializer.fastlane!(%Phoenix.Socket.Broadcast{ + topic: "js_output", + event: "event:#{ref}", + payload: transport_encode!([event], payload) + }) + end) + end + # A user payload can be either a JSON-serializable term # or a {:binary, info, binary} tuple, where info is a # JSON-serializable term. The channel allows for sending diff --git a/test/livebook_web/channels/js_output_channel_test.exs b/test/livebook_web/channels/js_output_channel_test.exs index 002db2ed3..761ec4499 100644 --- a/test/livebook_web/channels/js_output_channel_test.exs +++ b/test/livebook_web/channels/js_output_channel_test.exs @@ -23,12 +23,6 @@ defmodule LivebookWeb.JSOutputChannelTest do assert_push "init:1", %{"root" => [nil, [1, 2, 3]]} end - test "sends events received from widget server to the client", %{socket: socket} do - send(socket.channel_pid, {:event, "ping", [1, 2, 3], %{ref: "1"}}) - - assert_push "event:1", %{"root" => [["ping"], [1, 2, 3]]} - end - test "sends client events to the corresponding widget server", %{socket: socket} do push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"}) @@ -51,14 +45,6 @@ defmodule LivebookWeb.JSOutputChannelTest do assert_push "init:1", {:binary, <<24::size(32), "[null,{\"message\":\"hey\"}]", 1, 2, 3>>} end - test "from server to client", %{socket: socket} do - payload = {:binary, %{message: "hey"}, <<1, 2, 3>>} - send(socket.channel_pid, {:event, "ping", payload, %{ref: "1"}}) - - assert_push "event:1", - {:binary, <<28::size(32), "[[\"ping\"],{\"message\":\"hey\"}]", 1, 2, 3>>} - end - test "form client to server", %{socket: socket} do push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"})