2022-02-28 20:53:33 +08:00
|
|
|
defmodule LivebookWeb.JSViewChannel do
|
2022-01-06 23:31:26 +08:00
|
|
|
use Phoenix.Channel
|
|
|
|
|
|
|
|
@impl true
|
2022-02-28 20:53:33 +08:00
|
|
|
def join("js_view", %{"session_id" => session_id}, socket) do
|
2022-03-23 01:25:42 +08:00
|
|
|
{:ok, assign(socket, session_id: session_id, ref_with_info: %{})}
|
2022-01-06 23:31:26 +08:00
|
|
|
end
|
|
|
|
|
|
|
|
@impl true
|
2022-03-23 01:25:42 +08:00
|
|
|
def handle_in("connect", %{"session_token" => session_token, "ref" => ref, "id" => id}, socket) do
|
2022-02-28 20:53:33 +08:00
|
|
|
{:ok, data} = Phoenix.Token.verify(LivebookWeb.Endpoint, "js view", session_token)
|
2022-01-06 23:31:26 +08:00
|
|
|
%{pid: pid} = data
|
|
|
|
|
|
|
|
send(pid, {:connect, self(), %{origin: self(), ref: ref}})
|
|
|
|
|
2022-03-23 01:25:42 +08:00
|
|
|
socket =
|
|
|
|
update_in(socket.assigns.ref_with_info[ref], fn
|
|
|
|
nil -> %{pid: pid, count: 1, connect_queue: [id]}
|
|
|
|
info -> %{info | count: info.count + 1, connect_queue: info.connect_queue ++ [id]}
|
|
|
|
end)
|
2022-01-11 01:38:08 +08:00
|
|
|
|
2022-03-23 01:25:42 +08:00
|
|
|
if socket.assigns.ref_with_info[ref].count == 1 do
|
2022-02-10 06:15:03 +08:00
|
|
|
Livebook.Session.subscribe_to_runtime_events(
|
|
|
|
socket.assigns.session_id,
|
|
|
|
"js_live",
|
|
|
|
ref,
|
|
|
|
&fastlane_encoder/1,
|
|
|
|
socket.transport_pid
|
|
|
|
)
|
2022-01-11 01:38:08 +08:00
|
|
|
end
|
|
|
|
|
2022-01-06 23:31:26 +08:00
|
|
|
{:noreply, socket}
|
|
|
|
end
|
|
|
|
|
2022-02-08 04:03:25 +08:00
|
|
|
def handle_in("event", raw, socket) do
|
|
|
|
{[event, ref], payload} = transport_decode!(raw)
|
2022-03-23 01:25:42 +08:00
|
|
|
pid = socket.assigns.ref_with_info[ref].pid
|
2022-01-06 23:31:26 +08:00
|
|
|
send(pid, {:event, event, payload, %{origin: self(), ref: ref}})
|
|
|
|
{:noreply, socket}
|
|
|
|
end
|
|
|
|
|
|
|
|
def handle_in("disconnect", %{"ref" => ref}, socket) do
|
|
|
|
socket =
|
2022-03-23 01:25:42 +08:00
|
|
|
if socket.assigns.ref_with_info[ref].count == 1 do
|
2022-01-11 01:38:08 +08:00
|
|
|
Livebook.Session.unsubscribe_from_runtime_events(
|
|
|
|
socket.assigns.session_id,
|
|
|
|
"js_live",
|
|
|
|
ref
|
|
|
|
)
|
|
|
|
|
2022-03-23 01:25:42 +08:00
|
|
|
{_, socket} = pop_in(socket.assigns.ref_with_info[ref])
|
|
|
|
socket
|
2022-01-06 23:31:26 +08:00
|
|
|
else
|
2022-03-23 01:25:42 +08:00
|
|
|
update_in(socket.assigns.ref_with_info[ref], &%{&1 | count: &1.count - 1})
|
2022-01-06 23:31:26 +08:00
|
|
|
end
|
|
|
|
|
|
|
|
{:noreply, socket}
|
|
|
|
end
|
|
|
|
|
|
|
|
@impl true
|
2022-02-10 06:15:03 +08:00
|
|
|
def handle_info({:connect_reply, payload, %{ref: ref}}, socket) do
|
2022-03-23 01:25:42 +08:00
|
|
|
# Multiple connections for the same reference may be establish,
|
|
|
|
# the replies come sequentially and we dispatch them according
|
|
|
|
# to the clients queue
|
|
|
|
|
|
|
|
{id, socket} =
|
|
|
|
get_and_update_in(socket.assigns.ref_with_info[ref].connect_queue, fn [id | queue] ->
|
|
|
|
{id, queue}
|
|
|
|
end)
|
|
|
|
|
|
|
|
with {:error, error} <- try_push(socket, "init:#{ref}:#{id}", nil, payload) do
|
2022-01-06 23:31:26 +08:00
|
|
|
message = "Failed to serialize initial widget data, " <> error
|
2022-04-09 21:48:44 +08:00
|
|
|
push(socket, "error:#{ref}", %{"message" => message, "init" => true})
|
2022-01-06 23:31:26 +08:00
|
|
|
end
|
|
|
|
|
|
|
|
{:noreply, socket}
|
|
|
|
end
|
|
|
|
|
2022-02-10 06:15:03 +08:00
|
|
|
def handle_info({:encoding_error, error, {:event, _event, _payload, %{ref: ref}}}, socket) do
|
|
|
|
message = "Failed to serialize widget data, " <> error
|
|
|
|
push(socket, "error:#{ref}", %{"message" => message})
|
2022-01-06 23:31:26 +08:00
|
|
|
{:noreply, socket}
|
|
|
|
end
|
|
|
|
|
2022-02-08 04:03:25 +08:00
|
|
|
defp try_push(socket, event, meta, payload) do
|
2022-02-10 06:15:03 +08:00
|
|
|
with {:ok, _} <-
|
|
|
|
run_safely(fn ->
|
|
|
|
push(socket, event, transport_encode!(meta, payload))
|
|
|
|
end),
|
|
|
|
do: :ok
|
|
|
|
end
|
|
|
|
|
|
|
|
# In case the payload fails to encode we catch the error
|
|
|
|
defp run_safely(fun) do
|
2022-01-06 23:31:26 +08:00
|
|
|
try do
|
2022-02-10 06:15:03 +08:00
|
|
|
{:ok, fun.()}
|
2022-01-06 23:31:26 +08:00
|
|
|
catch
|
|
|
|
:error, %Protocol.UndefinedError{protocol: Jason.Encoder, value: value} ->
|
|
|
|
{:error, "value #{inspect(value)} is not JSON-serializable, use another data type"}
|
|
|
|
|
|
|
|
:error, error ->
|
|
|
|
{:error, Exception.message(error)}
|
|
|
|
end
|
|
|
|
end
|
2022-02-08 04:03:25 +08:00
|
|
|
|
2022-02-10 06:15:03 +08:00
|
|
|
defp fastlane_encoder({:event, event, payload, %{ref: ref}}) do
|
|
|
|
run_safely(fn ->
|
|
|
|
Phoenix.Socket.V2.JSONSerializer.fastlane!(%Phoenix.Socket.Broadcast{
|
2022-02-28 20:53:33 +08:00
|
|
|
topic: "js_view",
|
2022-02-10 06:15:03 +08:00
|
|
|
event: "event:#{ref}",
|
|
|
|
payload: transport_encode!([event], payload)
|
|
|
|
})
|
|
|
|
end)
|
|
|
|
end
|
|
|
|
|
2022-02-08 04:03:25 +08:00
|
|
|
# A user payload can be either a JSON-serializable term
|
|
|
|
# or a {:binary, info, binary} tuple, where info is a
|
|
|
|
# JSON-serializable term. The channel allows for sending
|
|
|
|
# either maps or binaries, so we need to translare the
|
|
|
|
# payload accordingly
|
|
|
|
|
|
|
|
defp transport_encode!(meta, {:binary, info, binary}) do
|
|
|
|
{:binary, encode!([meta, info], binary)}
|
|
|
|
end
|
|
|
|
|
|
|
|
defp transport_encode!(meta, payload) do
|
|
|
|
%{"root" => [meta, payload]}
|
|
|
|
end
|
|
|
|
|
|
|
|
defp transport_decode!({:binary, raw}) do
|
|
|
|
{[meta, info], binary} = decode!(raw)
|
|
|
|
{meta, {:binary, info, binary}}
|
|
|
|
end
|
|
|
|
|
|
|
|
defp transport_decode!(raw) do
|
|
|
|
%{"root" => [meta, payload]} = raw
|
|
|
|
{meta, payload}
|
|
|
|
end
|
|
|
|
|
|
|
|
defp encode!(meta, binary) do
|
|
|
|
meta = Jason.encode!(meta)
|
|
|
|
meta_size = byte_size(meta)
|
|
|
|
<<meta_size::size(32), meta::binary, binary::binary>>
|
|
|
|
end
|
|
|
|
|
|
|
|
defp decode!(raw) do
|
|
|
|
<<meta_size::size(32), meta::binary-size(meta_size), binary::binary>> = raw
|
|
|
|
meta = Jason.decode!(meta)
|
|
|
|
{meta, binary}
|
|
|
|
end
|
2022-01-06 23:31:26 +08:00
|
|
|
end
|