Introduce broadcast communication from runtime ()

* Introduce broadcast communication from runtime

* Return broadcast target from group leader

* Increase timeout

* Make the channel test async

* Decouple base topic and ref
This commit is contained in:
Jonatan Kłosko 2022-01-10 18:38:08 +01:00 committed by GitHub
parent 618593158d
commit 118cf05d0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 71 additions and 13 deletions
assets/js/js_output
lib
livebook
livebook_web
test

View file

@ -8,9 +8,9 @@ const global = {
};
// Returns channel responsible for JS communication in the current session
function getChannel(socket, { create = true } = {}) {
function getChannel(socket, sessionId, { create = true } = {}) {
if (!global.channel && create) {
global.channel = socket.channel("js_output", {});
global.channel = socket.channel("js_output", { session_id: sessionId });
global.channel.join();
}
@ -64,7 +64,10 @@ const JSOutput = {
errorContainer: null,
};
const channel = getChannel(this.__liveSocket.getSocket());
const channel = getChannel(
this.__liveSocket.getSocket(),
this.props.sessionId
);
const iframePlaceholder = document.createElement("div");
const iframe = document.createElement("iframe");
@ -240,9 +243,13 @@ const JSOutput = {
this.intersectionObserver.disconnect();
this.state.iframe.remove();
const channel = getChannel(this.__liveSocket.getSocket(), {
create: false,
});
const channel = getChannel(
this.__liveSocket.getSocket(),
this.props.sessionId,
{
create: false,
}
);
if (channel) {
this.state.channelUnsubscribe();
@ -257,6 +264,7 @@ function getProps(hook) {
assetsBaseUrl: getAttributeOrThrow(hook.el, "data-assets-base-url"),
jsPath: getAttributeOrThrow(hook.el, "data-js-path"),
sessionToken: getAttributeOrThrow(hook.el, "data-session-token"),
sessionId: getAttributeOrThrow(hook.el, "data-session-id"),
};
}

View file

@ -238,6 +238,10 @@ defmodule Livebook.Evaluator.IOProxy do
{reply, state}
end
defp io_request(:livebook_get_broadcast_target, state) do
{{:ok, state.target}, state}
end
defp io_request(_, state) do
{{:error, :request}, state}
end

View file

@ -782,6 +782,12 @@ defmodule Livebook.Session do
{:noreply, state}
end
def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do
full_topic = runtime_messages_topic(state.session_id, topic, subtopic)
Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message)
{:noreply, state}
end
def handle_info({:container_down, container_ref, message}, state) do
broadcast_error(state.session_id, "evaluation process terminated - #{message}")
@ -1221,6 +1227,30 @@ defmodule Livebook.Session do
:ok = :erl_tar.extract({:binary, binary}, [:compressed, {:cwd, path}])
end
@doc """
Subscribes the caller to runtime messages under the given topic.
"""
@spec subscribe_to_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()}
def subscribe_to_runtime_events(session_id, topic, subtopic) do
Phoenix.PubSub.subscribe(Livebook.PubSub, runtime_messages_topic(session_id, topic, subtopic))
end
@doc """
Unsubscribes the caller from runtime messages subscribed earlier
with `subscribe_to_runtime_events/3`.
"""
@spec unsubscribe_from_runtime_events(id(), String.t(), String.t()) :: :ok | {:error, term()}
def unsubscribe_from_runtime_events(session_id, topic, subtopic) do
Phoenix.PubSub.unsubscribe(
Livebook.PubSub,
runtime_messages_topic(session_id, topic, subtopic)
)
end
defp runtime_messages_topic(session_id, topic, subtopic) do
"sessions:#{session_id}:runtime_messages:#{topic}:#{subtopic}"
end
@doc """
Determines locator of the evaluation that the given
cell depends on.

View file

@ -2,8 +2,8 @@ defmodule LivebookWeb.JSOutputChannel do
use Phoenix.Channel
@impl true
def join("js_output", %{}, socket) do
{:ok, assign(socket, ref_with_pid: %{}, ref_with_count: %{})}
def join("js_output", %{"session_id" => session_id}, socket) do
{:ok, assign(socket, session_id: session_id, ref_with_pid: %{}, ref_with_count: %{})}
end
@impl true
@ -16,6 +16,11 @@ defmodule LivebookWeb.JSOutputChannel do
ref_with_pid = Map.put(socket.assigns.ref_with_pid, ref, pid)
ref_with_count = Map.update(socket.assigns.ref_with_count, ref, 1, &(&1 + 1))
socket = assign(socket, ref_with_pid: ref_with_pid, ref_with_count: ref_with_count)
if socket.assigns.ref_with_count[ref] == 1 do
Livebook.Session.subscribe_to_runtime_events(socket.assigns.session_id, "js_live", ref)
end
{:noreply, socket}
end
@ -28,6 +33,12 @@ defmodule LivebookWeb.JSOutputChannel do
def handle_in("disconnect", %{"ref" => ref}, socket) do
socket =
if socket.assigns.ref_with_count[ref] == 1 do
Livebook.Session.unsubscribe_from_runtime_events(
socket.assigns.session_id,
"js_live",
ref
)
{_, ref_with_count} = Map.pop!(socket.assigns.ref_with_count, ref)
{_, ref_with_pid} = Map.pop!(socket.assigns.ref_with_pid, ref)
assign(socket, ref_with_count: ref_with_count, ref_with_pid: ref_with_pid)

View file

@ -10,7 +10,8 @@ defmodule LivebookWeb.Output.JSComponent do
data-ref={@info.ref}
data-assets-base-url={Routes.session_url(@socket, :show_asset, @session_id, @info.assets.hash, [])}
data-js-path={@info.assets.js_path}
data-session-token={session_token(@info.pid)}>
data-session-token={session_token(@info.pid)}
data-session-id={@session_id}>
</div>
"""
end

View file

@ -54,8 +54,8 @@ defmodule Livebook.UniqueTaskTest do
send(parent, {:result2, result})
end)
assert_receive {:ping_from_task, task1_pid}, 200
assert_receive {:ping_from_task, task2_pid}, 200
assert_receive {:ping_from_task, task1_pid}, 2000
assert_receive {:ping_from_task, task2_pid}, 2000
send(task1_pid, :pong)
send(task2_pid, :pong)

View file

@ -1,11 +1,15 @@
defmodule LivebookWeb.JSOutputChannelTest do
use LivebookWeb.ChannelCase
use LivebookWeb.ChannelCase, async: true
setup do
session_id = Livebook.Utils.random_node_aware_id()
{:ok, _, socket} =
LivebookWeb.Socket
|> socket()
|> subscribe_and_join(LivebookWeb.JSOutputChannel, "js_output")
|> subscribe_and_join(LivebookWeb.JSOutputChannel, "js_output", %{
"session_id" => session_id
})
%{socket: socket}
end