diff --git a/assets/js/js_output/index.js b/assets/js/js_output/index.js index f612d17ca..76cf4cd86 100644 --- a/assets/js/js_output/index.js +++ b/assets/js/js_output/index.js @@ -8,9 +8,9 @@ const global = { }; // Returns channel responsible for JS communication in the current session -function getChannel(socket, { create = true } = {}) { +function getChannel(socket, sessionId, { create = true } = {}) { if (!global.channel && create) { - global.channel = socket.channel("js_output", {}); + global.channel = socket.channel("js_output", { session_id: sessionId }); global.channel.join(); } @@ -64,7 +64,10 @@ const JSOutput = { errorContainer: null, }; - const channel = getChannel(this.__liveSocket.getSocket()); + const channel = getChannel( + this.__liveSocket.getSocket(), + this.props.sessionId + ); const iframePlaceholder = document.createElement("div"); const iframe = document.createElement("iframe"); @@ -240,9 +243,13 @@ const JSOutput = { this.intersectionObserver.disconnect(); this.state.iframe.remove(); - const channel = getChannel(this.__liveSocket.getSocket(), { - create: false, - }); + const channel = getChannel( + this.__liveSocket.getSocket(), + this.props.sessionId, + { + create: false, + } + ); if (channel) { this.state.channelUnsubscribe(); @@ -257,6 +264,7 @@ function getProps(hook) { assetsBaseUrl: getAttributeOrThrow(hook.el, "data-assets-base-url"), jsPath: getAttributeOrThrow(hook.el, "data-js-path"), sessionToken: getAttributeOrThrow(hook.el, "data-session-token"), + sessionId: getAttributeOrThrow(hook.el, "data-session-id"), }; } diff --git a/lib/livebook/evaluator/io_proxy.ex b/lib/livebook/evaluator/io_proxy.ex index c34fe76a4..b2d3f0444 100644 --- a/lib/livebook/evaluator/io_proxy.ex +++ b/lib/livebook/evaluator/io_proxy.ex @@ -238,6 +238,10 @@ defmodule Livebook.Evaluator.IOProxy do {reply, state} end + defp io_request(:livebook_get_broadcast_target, state) do + {{:ok, state.target}, state} + end + defp io_request(_, state) do {{:error, :request}, state} end diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index b24f9dda6..088572f2c 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -782,6 +782,12 @@ defmodule Livebook.Session do {:noreply, state} end + def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do + full_topic = runtime_messages_topic(state.session_id, topic, subtopic) + Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message) + {:noreply, state} + end + def handle_info({:container_down, container_ref, message}, state) do broadcast_error(state.session_id, "evaluation process terminated - #{message}") @@ -1221,6 +1227,30 @@ defmodule Livebook.Session do :ok = :erl_tar.extract({:binary, binary}, [:compressed, {:cwd, path}]) end + @doc """ + Subscribes the caller to runtime messages under the given topic. + """ + @spec subscribe_to_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()} + def subscribe_to_runtime_events(session_id, topic, subtopic) do + Phoenix.PubSub.subscribe(Livebook.PubSub, runtime_messages_topic(session_id, topic, subtopic)) + end + + @doc """ + Unsubscribes the caller from runtime messages subscribed earlier + with `subscribe_to_runtime_events/3`. + """ + @spec unsubscribe_from_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()} + def unsubscribe_from_runtime_events(session_id, topic, subtopic) do + Phoenix.PubSub.unsubscribe( + Livebook.PubSub, + runtime_messages_topic(session_id, topic, subtopic) + ) + end + + defp runtime_messages_topic(session_id, topic, subtopic) do + "sessions:#{session_id}:runtime_messages:#{topic}:#{subtopic}" + end + @doc """ Determines locator of the evaluation that the given cell depends on. diff --git a/lib/livebook_web/channels/js_output_channel.ex b/lib/livebook_web/channels/js_output_channel.ex index bffea7d4a..3469f1b02 100644 --- a/lib/livebook_web/channels/js_output_channel.ex +++ b/lib/livebook_web/channels/js_output_channel.ex @@ -2,8 +2,8 @@ defmodule LivebookWeb.JSOutputChannel do use Phoenix.Channel @impl true - def join("js_output", %{}, socket) do - {:ok, assign(socket, ref_with_pid: %{}, ref_with_count: %{})} + def join("js_output", %{"session_id" => session_id}, socket) do + {:ok, assign(socket, session_id: session_id, ref_with_pid: %{}, ref_with_count: %{})} end @impl true @@ -16,6 +16,11 @@ defmodule LivebookWeb.JSOutputChannel do ref_with_pid = Map.put(socket.assigns.ref_with_pid, ref, pid) ref_with_count = Map.update(socket.assigns.ref_with_count, ref, 1, &(&1 + 1)) socket = assign(socket, ref_with_pid: ref_with_pid, ref_with_count: ref_with_count) + + if socket.assigns.ref_with_count[ref] == 1 do + Livebook.Session.subscribe_to_runtime_events(socket.assigns.session_id, "js_live", ref) + end + {:noreply, socket} end @@ -28,6 +33,12 @@ defmodule LivebookWeb.JSOutputChannel do def handle_in("disconnect", %{"ref" => ref}, socket) do socket = if socket.assigns.ref_with_count[ref] == 1 do + Livebook.Session.unsubscribe_from_runtime_events( + socket.assigns.session_id, + "js_live", + ref + ) + {_, ref_with_count} = Map.pop!(socket.assigns.ref_with_count, ref) {_, ref_with_pid} = Map.pop!(socket.assigns.ref_with_pid, ref) assign(socket, ref_with_count: ref_with_count, ref_with_pid: ref_with_pid) diff --git a/lib/livebook_web/live/output/js_component.ex b/lib/livebook_web/live/output/js_component.ex index 80a631821..5274cc680 100644 --- a/lib/livebook_web/live/output/js_component.ex +++ b/lib/livebook_web/live/output/js_component.ex @@ -10,7 +10,8 @@ defmodule LivebookWeb.Output.JSComponent do data-ref={@info.ref} data-assets-base-url={Routes.session_url(@socket, :show_asset, @session_id, @info.assets.hash, [])} data-js-path={@info.assets.js_path} - data-session-token={session_token(@info.pid)}> + data-session-token={session_token(@info.pid)} + data-session-id={@session_id}> """ end diff --git a/test/livebook/unique_task_test.exs b/test/livebook/unique_task_test.exs index 9b0287a0c..2ed1b6ec8 100644 --- a/test/livebook/unique_task_test.exs +++ b/test/livebook/unique_task_test.exs @@ -54,8 +54,8 @@ defmodule Livebook.UniqueTaskTest do send(parent, {:result2, result}) end) - assert_receive {:ping_from_task, task1_pid}, 200 - assert_receive {:ping_from_task, task2_pid}, 200 + assert_receive {:ping_from_task, task1_pid}, 2000 + assert_receive {:ping_from_task, task2_pid}, 2000 send(task1_pid, :pong) send(task2_pid, :pong) diff --git a/test/livebook_web/channels/js_output_channel_test.exs b/test/livebook_web/channels/js_output_channel_test.exs index dce574b1c..41cc0e1db 100644 --- a/test/livebook_web/channels/js_output_channel_test.exs +++ b/test/livebook_web/channels/js_output_channel_test.exs @@ -1,11 +1,15 @@ defmodule LivebookWeb.JSOutputChannelTest do - use LivebookWeb.ChannelCase + use LivebookWeb.ChannelCase, async: true setup do + session_id = Livebook.Utils.random_node_aware_id() + {:ok, _, socket} = LivebookWeb.Socket |> socket() - |> subscribe_and_join(LivebookWeb.JSOutputChannel, "js_output") + |> subscribe_and_join(LivebookWeb.JSOutputChannel, "js_output", %{ + "session_id" => session_id + }) %{socket: socket} end