From ccc64876a8e1ed755cce94b3660c19fd279c29ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Mon, 7 Feb 2022 21:03:25 +0100 Subject: [PATCH] Add support for widget binary payloads (#982) * Add support for widget binary payloads * Migrate LV auth to a separate hook * Properly set user buffer when encoding --- assets/js/js_output/index.js | 107 +++++++++++++----- .../channels/js_output_channel.ex | 47 +++++++- lib/livebook_web/channels/socket.ex | 13 +-- lib/livebook_web/endpoint.ex | 15 ++- lib/livebook_web/live/hooks/auth_hook.ex | 14 +++ .../live/{ => hooks}/user_hook.ex | 2 +- lib/livebook_web/router.ex | 2 +- .../channels/js_output_channel_test.exs | 39 ++++++- 8 files changed, 187 insertions(+), 52 deletions(-) create mode 100644 lib/livebook_web/live/hooks/auth_hook.ex rename lib/livebook_web/live/{ => hooks}/user_hook.ex (97%) diff --git a/assets/js/js_output/index.js b/assets/js/js_output/index.js index 90dab9546..a8421115e 100644 --- a/assets/js/js_output/index.js +++ b/assets/js/js_output/index.js @@ -1,3 +1,4 @@ +import { Socket } from "phoenix"; import { getAttributeOrThrow } from "../lib/attribute"; import { randomToken, sha256Base64 } from "../lib/utils"; @@ -41,10 +42,7 @@ const JSOutput = { errorContainer: null, }; - const channel = getChannel( - this.__liveSocket.getSocket(), - this.props.sessionId - ); + const channel = getChannel(this.props.sessionId); // When cells/sections are reordered, morphdom detaches and attaches // the relevant elements in the DOM. Consequently the output element @@ -118,7 +116,8 @@ const JSOutput = { this.el.dispatchEvent(event); } else if (message.type === "event") { const { event, payload } = message; - channel.push("event", { event, payload, ref: this.props.ref }); + const raw = transportEncode([event, this.props.ref], payload); + channel.push("event", raw); } } }; @@ -147,20 +146,18 @@ const JSOutput = { ref: this.props.ref, }); - const initRef = channel.on(`init:${this.props.ref}`, ({ data }) => { + const initRef = channel.on(`init:${this.props.ref}`, (raw) => { + const [, payload] = transportDecode(raw); + this.state.childReadyPromise.then(() => { - postMessage({ type: "init", data }); + postMessage({ type: "init", data: payload }); }); }); - const eventRef = channel.on( - `event:${this.props.ref}`, - ({ event, payload }) => { - this.state.childReadyPromise.then(() => { - postMessage({ type: "event", event, payload }); - }); - } - ); + const eventRef = channel.on(`event:${this.props.ref}`, (raw) => { + const [[event], payload] = transportDecode(raw); + postMessage({ type: "event", event, payload }); + }); const errorRef = channel.on(`error:${this.props.ref}`, ({ message }) => { if (!this.state.errorContainer) { @@ -188,13 +185,7 @@ const JSOutput = { this.disconnectObservers(); this.state.iframe.remove(); - const channel = getChannel( - this.__liveSocket.getSocket(), - this.props.sessionId, - { - create: false, - } - ); + const channel = getChannel(this.props.sessionId, { create: false }); if (channel) { this.state.channelUnsubscribe(); @@ -213,13 +204,19 @@ function getProps(hook) { }; } +const csrfToken = document + .querySelector("meta[name='csrf-token']") + .getAttribute("content"); +const socket = new Socket("/socket", { params: { _csrf_token: csrfToken } }); + let channel = null; /** * Returns channel used for all JS outputs in the current session. */ -function getChannel(socket, sessionId, { create = true } = {}) { +function getChannel(sessionId, { create = true } = {}) { if (!channel && create) { + socket.connect(); channel = socket.channel("js_output", { session_id: sessionId }); channel.join(); } @@ -231,10 +228,8 @@ function getChannel(socket, sessionId, { create = true } = {}) { * Leaves the JS outputs channel tied to the current session. */ export function leaveChannel() { - if (channel) { - channel.leave(); - channel = null; - } + socket.disconnect(); + channel = null; } /** @@ -330,4 +325,62 @@ function verifyIframeSource() { return iframeVerificationPromise; } +function transportEncode(meta, payload) { + if ( + Array.isArray(payload) && + payload[1] && + payload[1].constructor === ArrayBuffer + ) { + const [info, buffer] = payload; + return encode([meta, info], buffer); + } else { + return { root: [meta, payload] }; + } +} + +function transportDecode(raw) { + if (raw.constructor === ArrayBuffer) { + const [[meta, info], buffer] = decode(raw); + return [meta, [info, buffer]]; + } else { + const { + root: [meta, payload], + } = raw; + return [meta, payload]; + } +} + +const HEADER_LENGTH = 4; + +function encode(meta, buffer) { + const encoder = new TextEncoder(); + const metaArray = encoder.encode(JSON.stringify(meta)); + + const raw = new ArrayBuffer( + HEADER_LENGTH + metaArray.byteLength + buffer.byteLength + ); + const view = new DataView(raw); + + view.setUint32(0, metaArray.byteLength); + new Uint8Array(raw, HEADER_LENGTH, metaArray.byteLength).set(metaArray); + new Uint8Array(raw, HEADER_LENGTH + metaArray.byteLength).set( + new Uint8Array(buffer) + ); + + return raw; +} + +function decode(raw) { + const view = new DataView(raw); + const metaArrayLength = view.getUint32(0); + + const metaArray = new Uint8Array(raw, HEADER_LENGTH, metaArrayLength); + const buffer = raw.slice(HEADER_LENGTH + metaArrayLength); + + const decoder = new TextDecoder(); + const meta = JSON.parse(decoder.decode(metaArray)); + + return [meta, buffer]; +} + export default JSOutput; diff --git a/lib/livebook_web/channels/js_output_channel.ex b/lib/livebook_web/channels/js_output_channel.ex index 3469f1b02..867334518 100644 --- a/lib/livebook_web/channels/js_output_channel.ex +++ b/lib/livebook_web/channels/js_output_channel.ex @@ -24,7 +24,8 @@ defmodule LivebookWeb.JSOutputChannel do {:noreply, socket} end - def handle_in("event", %{"event" => event, "payload" => payload, "ref" => ref}, socket) do + def handle_in("event", raw, socket) do + {[event, ref], payload} = transport_decode!(raw) pid = socket.assigns.ref_with_pid[ref] send(pid, {:event, event, payload, %{origin: self(), ref: ref}}) {:noreply, socket} @@ -52,7 +53,7 @@ defmodule LivebookWeb.JSOutputChannel do @impl true def handle_info({:connect_reply, data, %{ref: ref}}, socket) do - with {:error, error} <- try_push(socket, "init:#{ref}", %{"data" => data}) do + with {:error, error} <- try_push(socket, "init:#{ref}", nil, data) do message = "Failed to serialize initial widget data, " <> error push(socket, "error:#{ref}", %{"message" => message}) end @@ -61,8 +62,7 @@ defmodule LivebookWeb.JSOutputChannel do end def handle_info({:event, event, payload, %{ref: ref}}, socket) do - with {:error, error} <- - try_push(socket, "event:#{ref}", %{"event" => event, "payload" => payload}) do + with {:error, error} <- try_push(socket, "event:#{ref}", [event], payload) do message = "Failed to serialize event payload, " <> error push(socket, "error:#{ref}", %{"message" => message}) end @@ -71,8 +71,9 @@ defmodule LivebookWeb.JSOutputChannel do end # In case the payload fails to encode we catch the error - defp try_push(socket, event, payload) do + defp try_push(socket, event, meta, payload) do try do + payload = transport_encode!(meta, payload) push(socket, event, payload) catch :error, %Protocol.UndefinedError{protocol: Jason.Encoder, value: value} -> @@ -82,4 +83,40 @@ defmodule LivebookWeb.JSOutputChannel do {:error, Exception.message(error)} end end + + # A user payload can be either a JSON-serializable term + # or a {:binary, info, binary} tuple, where info is a + # JSON-serializable term. The channel allows for sending + # either maps or binaries, so we need to translare the + # payload accordingly + + defp transport_encode!(meta, {:binary, info, binary}) do + {:binary, encode!([meta, info], binary)} + end + + defp transport_encode!(meta, payload) do + %{"root" => [meta, payload]} + end + + defp transport_decode!({:binary, raw}) do + {[meta, info], binary} = decode!(raw) + {meta, {:binary, info, binary}} + end + + defp transport_decode!(raw) do + %{"root" => [meta, payload]} = raw + {meta, payload} + end + + defp encode!(meta, binary) do + meta = Jason.encode!(meta) + meta_size = byte_size(meta) + <> + end + + defp decode!(raw) do + <> = raw + meta = Jason.decode!(meta) + {meta, binary} + end end diff --git a/lib/livebook_web/channels/socket.ex b/lib/livebook_web/channels/socket.ex index cb5c8b903..cd2b12018 100644 --- a/lib/livebook_web/channels/socket.ex +++ b/lib/livebook_web/channels/socket.ex @@ -1,26 +1,19 @@ defmodule LivebookWeb.Socket do use Phoenix.Socket - # App channels channel "js_output", LivebookWeb.JSOutputChannel - # LiveView channels - channel "lvu:*", Phoenix.LiveView.UploadChannel - channel "lv:*", Phoenix.LiveView.Channel - @impl true - def connect(params, socket, info) do + def connect(_params, socket, info) do auth_mode = Livebook.Config.auth_mode() if LivebookWeb.AuthPlug.authenticated?(info.session || %{}, info.uri.port, auth_mode) do - Phoenix.LiveView.Socket.connect(params, socket, info) + {:ok, socket} else :error end end @impl true - def id(socket) do - Phoenix.LiveView.Socket.id(socket) - end + def id(_socket), do: nil end diff --git a/lib/livebook_web/endpoint.ex b/lib/livebook_web/endpoint.ex index 514e104ed..81ecc02fa 100644 --- a/lib/livebook_web/endpoint.ex +++ b/lib/livebook_web/endpoint.ex @@ -11,11 +11,16 @@ defmodule LivebookWeb.Endpoint do same_site: "Lax" ] - socket "/live", LivebookWeb.Socket, - # Don't check the origin as we don't know how the web app is gonna be accessed. - # It runs locally, but may be exposed via IP or domain name. - # The WebSocket connection is already protected from CSWSH by using CSRF token. - websocket: [check_origin: false, connect_info: [:user_agent, :uri, session: @session_options]] + # Don't check the origin as we don't know how the web app is gonna be accessed. + # It runs locally, but may be exposed via IP or domain name. The WebSocket + # connection is already protected from CSWSH by using CSRF token. + @websocket_options [ + check_origin: false, + connect_info: [:user_agent, :uri, session: @session_options] + ] + + socket "/live", Phoenix.LiveView.Socket, websocket: @websocket_options + socket "/socket", LivebookWeb.Socket, websocket: @websocket_options # We use Escript for distributing Livebook, so we don't have access to the static # files at runtime in the prod environment. To overcome this we load contents of diff --git a/lib/livebook_web/live/hooks/auth_hook.ex b/lib/livebook_web/live/hooks/auth_hook.ex new file mode 100644 index 000000000..ad7ac9070 --- /dev/null +++ b/lib/livebook_web/live/hooks/auth_hook.ex @@ -0,0 +1,14 @@ +defmodule LivebookWeb.AuthHook do + import Phoenix.LiveView + + def on_mount(:default, _params, session, socket) do + uri = get_connect_info(socket, :uri) + auth_mode = Livebook.Config.auth_mode() + + if LivebookWeb.AuthPlug.authenticated?(session || %{}, uri.port, auth_mode) do + {:cont, socket} + else + {:halt, socket} + end + end +end diff --git a/lib/livebook_web/live/user_hook.ex b/lib/livebook_web/live/hooks/user_hook.ex similarity index 97% rename from lib/livebook_web/live/user_hook.ex rename to lib/livebook_web/live/hooks/user_hook.ex index 1211116a6..dec7b815e 100644 --- a/lib/livebook_web/live/user_hook.ex +++ b/lib/livebook_web/live/hooks/user_hook.ex @@ -1,4 +1,4 @@ -defmodule LivebookWeb.CurrentUserHook do +defmodule LivebookWeb.UserHook do import Phoenix.LiveView alias Livebook.Users.User diff --git a/lib/livebook_web/router.ex b/lib/livebook_web/router.ex index 9e4eb27b4..8d6e54fb0 100644 --- a/lib/livebook_web/router.ex +++ b/lib/livebook_web/router.ex @@ -39,7 +39,7 @@ defmodule LivebookWeb.Router do get "/sessions/:id/assets/:hash/*file_parts", SessionController, :show_asset end - live_session :default, on_mount: LivebookWeb.CurrentUserHook do + live_session :default, on_mount: [LivebookWeb.AuthHook, LivebookWeb.UserHook] do scope "/", LivebookWeb do pipe_through [:browser, :auth] diff --git a/test/livebook_web/channels/js_output_channel_test.exs b/test/livebook_web/channels/js_output_channel_test.exs index 41cc0e1db..002db2ed3 100644 --- a/test/livebook_web/channels/js_output_channel_test.exs +++ b/test/livebook_web/channels/js_output_channel_test.exs @@ -20,13 +20,13 @@ defmodule LivebookWeb.JSOutputChannelTest do assert_receive {:connect, from, %{}} send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}}) - assert_push "init:1", %{"data" => [1, 2, 3]} + assert_push "init:1", %{"root" => [nil, [1, 2, 3]]} end test "sends events received from widget server to the client", %{socket: socket} do send(socket.channel_pid, {:event, "ping", [1, 2, 3], %{ref: "1"}}) - assert_push "event:1", %{"event" => "ping", "payload" => [1, 2, 3]} + assert_push "event:1", %{"root" => [["ping"], [1, 2, 3]]} end test "sends client events to the corresponding widget server", %{socket: socket} do @@ -35,11 +35,44 @@ defmodule LivebookWeb.JSOutputChannelTest do assert_receive {:connect, from, %{}} send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}}) - push(socket, "event", %{"event" => "ping", "payload" => [1, 2, 3], "ref" => "1"}) + push(socket, "event", %{"root" => [["ping", "1"], [1, 2, 3]]}) assert_receive {:event, "ping", [1, 2, 3], %{origin: _origin}} end + describe "binary payload" do + test "initial data", %{socket: socket} do + push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"}) + + assert_receive {:connect, from, %{}} + payload = {:binary, %{message: "hey"}, <<1, 2, 3>>} + send(from, {:connect_reply, payload, %{ref: "1"}}) + + assert_push "init:1", {:binary, <<24::size(32), "[null,{\"message\":\"hey\"}]", 1, 2, 3>>} + end + + test "from server to client", %{socket: socket} do + payload = {:binary, %{message: "hey"}, <<1, 2, 3>>} + send(socket.channel_pid, {:event, "ping", payload, %{ref: "1"}}) + + assert_push "event:1", + {:binary, <<28::size(32), "[[\"ping\"],{\"message\":\"hey\"}]", 1, 2, 3>>} + end + + test "form client to server", %{socket: socket} do + push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"}) + + assert_receive {:connect, from, %{}} + send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}}) + + raw = {:binary, <<32::size(32), "[[\"ping\",\"1\"],{\"message\":\"hey\"}]", 1, 2, 3>>} + push(socket, "event", raw) + + payload = {:binary, %{"message" => "hey"}, <<1, 2, 3>>} + assert_receive {:event, "ping", ^payload, %{origin: _origin}} + end + end + defp session_token() do Phoenix.Token.sign(LivebookWeb.Endpoint, "js output", %{pid: self()}) end