diff --git a/lib/livebook/runtime/evaluator/io_proxy.ex b/lib/livebook/runtime/evaluator/io_proxy.ex index 82c75a0d5..ef0d25e0b 100644 --- a/lib/livebook/runtime/evaluator/io_proxy.ex +++ b/lib/livebook/runtime/evaluator/io_proxy.ex @@ -245,6 +245,12 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do {:ok, state} end + defp io_request({:livebook_put_output_to, client_id, output}, state) do + state = flush_buffer(state) + send(state.send_to, {:runtime_evaluation_output_to, client_id, state.ref, output}) + {:ok, state} + end + defp io_request({:livebook_get_input_value, input_id}, state) do input_cache = Map.put_new_lazy(state.input_cache, input_id, fn -> diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index 37969eb84..70541f673 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -1054,6 +1054,20 @@ defmodule Livebook.Session do {:noreply, handle_operation(state, operation)} end + def handle_info({:runtime_evaluation_output_to, client_id, cell_id, output}, state) do + client_pid = + Enum.find_value(state.client_pids_with_id, fn {pid, id} -> + id == client_id && pid + end) + + if client_pid do + operation = {:add_cell_evaluation_output, @client_id, cell_id, output} + send(client_pid, {:operation, operation}) + end + + {:noreply, state} + end + def handle_info({:runtime_evaluation_response, cell_id, response, metadata}, state) do {memory_usage, metadata} = Map.pop(metadata, :memory_usage) operation = {:add_cell_evaluation_response, @client_id, cell_id, response, metadata} diff --git a/test/livebook/runtime/evaluator/io_proxy_test.exs b/test/livebook/runtime/evaluator/io_proxy_test.exs index 5983334a6..94877cdb1 100644 --- a/test/livebook/runtime/evaluator/io_proxy_test.exs +++ b/test/livebook/runtime/evaluator/io_proxy_test.exs @@ -104,6 +104,12 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do assert_received {:runtime_evaluation_output, :ref, {:text, "[1, 2, 3]"}} end + test "supports direct livebook output forwarding for a specific client", %{io: io} do + livebook_put_output_to(io, "client1", {:text, "[1, 2, 3]"}) + + assert_received {:runtime_evaluation_output_to, "client1", :ref, {:text, "[1, 2, 3]"}} + end + describe "token requests" do test "returns different tokens for subsequent calls", %{io: io} do IOProxy.configure(io, :ref1, "cell1") @@ -155,6 +161,10 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do io_request(io, {:livebook_put_output, output}) end + defp livebook_put_output_to(io, client_id, output) do + io_request(io, {:livebook_put_output_to, client_id, output}) + end + defp livebook_get_input_value(io, input_id) do io_request(io, {:livebook_get_input_value, input_id}) end diff --git a/test/livebook_web/live/session_live_test.exs b/test/livebook_web/live/session_live_test.exs index a98a5036d..675d294fe 100644 --- a/test/livebook_web/live/session_live_test.exs +++ b/test/livebook_web/live/session_live_test.exs @@ -476,6 +476,30 @@ defmodule LivebookWeb.SessionLiveTest do assert content =~ "Updated frame" refute content =~ "In frame" end + + test "client specific output is sent only to one target", %{conn: conn, session: session} do + user1 = build(:user, name: "Jake Peralta") + {_, client_id} = Session.register_client(session.pid, self(), user1) + + Session.subscribe(session.id) + evaluate_setup(session.pid) + + section_id = insert_section(session.pid) + cell_id = insert_text_cell(session.pid, section_id, :code) + + Session.queue_cell_evaluation(session.pid, cell_id) + + send( + session.pid, + {:runtime_evaluation_output_to, client_id, cell_id, {:stdout, "line 1\n"}} + ) + + assert_receive {:operation, + {:add_cell_evaluation_output, _, ^cell_id, {:stdout, "line 1\n"}}} + + {:ok, view, _} = live(conn, "/sessions/#{session.id}") + refute render(view) =~ "line 1" + end end describe "smart cells" do