EventBroker benchmark

Mix.install([
  {:vega_lite, "~> 0.1.6"},
  {:kino_vega_lite, "~> 0.1.11"}
])

Introduction

What's in the Livebook

This Livebook serves as a tool to measure the performance of the EventBroker.

It has three sections, each answering one question.

  • Subscribe latency: measure how long it takes on average for a subscription to be made, while varying the number of existing subscriptions.
  • Unsubscribe latency: measure how long it takes on average for a subscriber to unsubscribe, while varying the number of existing subscriptions.
  • Subscribes per second: measure how long it takes to subscribe a given amount of subscribers, and determine the average subscriptions per second off of this.

At the bottom of the file is a cell to do some profiling. This is meant as a note on how to do actually do profiling.

Running the benchmarks

To run the experiments, a version of the Anoma application must be running on your local machine. Start the application in distributed mode as follows.

iex --name node@127.0.0.1 --cookie monster -S mix

🚨 If you use another node name or another cookie, change the values in the cell below.

Then run each cell in this livebook against that node.

# these have to be atoms!
node_name = :"node@127.0.0.1"
node_cookie = :monster

Filters

require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule Filters do
    alias EventBroker.Event
    use EventBroker.DefFilter

    deffilter FilterA, value: integer() do
      %EventBroker.Event{body: %Event{body: %{value_a: ^value}}} -> true
      _ -> false
    end

    deffilter FilterB, value: integer() do
      %EventBroker.Event{body: %Event{body: %{value_b: ^value}}} -> true
      _ -> false
    end

    deffilter FilterC, value: integer() do
      %EventBroker.Event{body: %Event{body: %{value_b: ^value}}} -> true
      _ -> false
    end
  end
  """,
  file: __ENV__.file
)

Helpers

require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule Helpers do
    # return the time in microseconds (μs)
    def measure(proc) do
      :timer.tc(proc)
    end

    def average_measure(proc, n) do
      {time, res} =
        for _ <- 1..n do
          measure(proc)
        end
        |> Enum.reduce({0, []}, fn {time, res}, {acc_time, acc_res} ->
          {time + acc_time, [res | acc_res]}
        end)

      {time / n, res}
    end

    def do_n_times(proc, n) do
      for _ <- 1..n do
        proc.()
      end
    end
  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule Examples do
    def subscriber(filters \\ []) do
      receive do
        {{:subscribe, filters}, from, ref} ->
          EventBroker.subscribe_me(filters)
          send(from, {:ok, ref})
          subscriber(filters)

        {:unsubscribe, from, ref} ->
          EventBroker.unsubscribe_me([])
          send(from, {:ok, ref})
          subscriber(filters)

        {:exit, from, ref} ->
          send(from, {:ok, ref})
          :ok
      end
    end

    def create_subscriber() do
      spawn_link(fn ->
        subscriber()
      end)
    end

    def sync_send(subscriber, message) do
      ref = make_ref()
      send(subscriber, {message, self(), ref})

      receive do
        {:ok, ^ref} ->
          :ok
      end
    end

    def async_send(subscriber, message) do
      ref = make_ref()
      send(subscriber, {message, self(), ref})
    end

    def subscribe(subscriber, filters \\ []) do
      sync_send(subscriber, {:subscribe, filters})
      subscriber
    end

    def unsubscribe(subscriber) do
      sync_send(subscriber, :unsubscribe)
      subscriber
    end

    def unsubscribe_async(subscriber) do
      async_send(subscriber, :unsubscribe)
      subscriber
    end

    def terminate_subscriber(subscriber) do
      sync_send(subscriber, :exit)
      subscriber
    end
  end
  """,
  file: __ENV__.file
)

Subscribe latency

The test below creates a number of subscribers, and measures how long it takes for a subscriber to become subscribed.

require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)

subscribes =
  Kino.RPC.eval_string(
    node,
    ~S"""
    import Helpers
    import Examples

    iterations = 10
    step_size = 100

    # the test to run for a given subscriber count and a given list of filters.
    test_subscribers = fn subscriber_count, filters ->
      {μs, results} =
        average_measure(
          fn ->
            create_subscriber()
            |> subscribe(filters)
          end,
          subscriber_count
        )

      # delete the subscribers again
      Enum.each(results, &terminate_subscriber/1)

      %{subscribers: subscriber_count, time: μs}
    end

    # test with no filters
    no_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = []

        test_subscribers.(subscriber_count, filters)
      end

    # test with 1 filter
    one_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = [%Filters.FilterA{value: 1}]

        test_subscribers.(subscriber_count, filters)
      end

    # test with 2 filters
    two_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}]

        test_subscribers.(subscriber_count, filters)
      end

    # test with 3 filters
    three_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}, %Filters.FilterC{value: 1}]

        test_subscribers.(subscriber_count, filters)
      end

    {no_filter, one_filter, two_filter, three_filter}
    """,
    file: __ENV__.file
  )
plots = fn data ->
  VegaLite.new()
  |> VegaLite.data_from_values(data, only: ["subscribers", "time"])
  |> VegaLite.mark(:line, point: [fill: "red"])
  |> VegaLite.encode_field(:x, "subscribers", type: :quantitative)
  |> VegaLite.encode_field(:y, "time", type: :quantitative, title: "Time (μs)")
end

# the first measurement makes the chart unreadable, so its dropped
no_filters = plots.(Enum.drop(elem(subscribes, 0), 1))
one_filters = plots.(Enum.drop(elem(subscribes, 1), 1))
two_filters = plots.(Enum.drop(elem(subscribes, 2), 1))
three_filters = plots.(Enum.drop(elem(subscribes, 3), 1))

VegaLite.new(width: 700)
|> VegaLite.layers([
  no_filters,
  one_filters,
  two_filters,
  three_filters
])

Unsubscribe Latency

The test below creates a number of subscribers, and measures how long it takes for a subscriber to become unsubscribed.

require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)

subscribes =
  Kino.RPC.eval_string(
    node,
    ~S"""
    import Helpers
    import Examples

    iterations = 10
    step_size = 100

    # the test to run for a given subscriber count and a given list of filters.
    test_subscribers = fn subscriber_count, filters ->
      # create the subscribers
      subscribers = do_n_times(fn -> create_subscriber() |> subscribe(filters) end, subscriber_count)

      # unsubscribe all of them and measure
      {μs, results} = measure(fn -> Enum.map(subscribers, &unsubscribe/1) end)

      # delete the subscribers again
      Enum.each(results, &terminate_subscriber/1)

      %{subscribers: subscriber_count, time: μs}
    end

    # test with no filters
    no_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = []

        test_subscribers.(subscriber_count, filters)
      end

    # test with 1 filter
    one_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = [%Filters.FilterA{value: 1}]

        test_subscribers.(subscriber_count, filters)
      end

    # test with 2 filters
    two_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}]

        test_subscribers.(subscriber_count, filters)
      end

    # test with 3 filters
    three_filter =
      for iteration <- 1..iterations do
        subscriber_count = iteration * step_size
        filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}, %Filters.FilterC{value: 1}]

        test_subscribers.(subscriber_count, filters)
      end

    {no_filter, one_filter, two_filter, three_filter}
    """,
    file: __ENV__.file
  )

Interpretation

The chart below shows that the time to unsubscribe increases exponentially with the number of nodes.

The red line is the actual execution time for the unsubscribe operation. The green line is the same line, but logarithmized.

The log line is flat, hence the execution time for unsubscribe is exponential.

plots = fn data ->
  VegaLite.new()
  |> VegaLite.data_from_values(data, only: ["subscribers", "time"])
  |> VegaLite.mark(:line, point: [fill: "red"])
  |> VegaLite.encode_field(:x, "subscribers", type: :quantitative)
  |> VegaLite.encode_field(:y, "time", type: :quantitative, title: "Time (μs)")
end

# the first measurement makes the chart unreadable, so its dropped
no_filters = plots.(Enum.drop(elem(subscribes, 0), 1))
one_filters = plots.(Enum.drop(elem(subscribes, 1), 1))
two_filters = plots.(Enum.drop(elem(subscribes, 2), 1))
three_filters = plots.(Enum.drop(elem(subscribes, 3), 1))

VegaLite.new(width: 700)
|> VegaLite.layers([
  no_filters,
  one_filters,
  two_filters,
  three_filters
])

Subscribes per second

require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)

subscribes_per_second =
  Kino.RPC.eval_string(
    node,
    ~S"""
    import Helpers
    import Examples


    iterations = 10
    step_size = 100

    make_n_subscriptions = fn count ->
      {time, subscribers} =
        measure(fn ->
          for _ <- 1..count do
            create_subscriber()
            |> subscribe()
          end
        end)

      # delete the subscribers again
      Enum.each(subscribers, &terminate_subscriber/1)

      # return the total time
      time
    end

    # warmup
    make_n_subscriptions.(100)

    # measure
    for i <- 1..iterations do
      subscription_count = i * step_size

      # time is in μs
      time = make_n_subscriptions.(subscription_count)

      # if `subscription_count` subscriptions takes `time` μs
      # then on average a single subscription takes `time / subscription_count` μs.
      average_time = time / subscription_count

      # if the average time for a subscription is `time / subscription_count`
      # then the `EventBroker` can handle `1_000_000 / (time / subscription_count)`
      # subscriptions per second.
      rate = 1_000_000 / average_time

      %{
        total_time: time,
        subscription_count: subscription_count,
        average_time: average_time,
        rate: rate
      }
    end
    """,
    file: __ENV__.file
  )
# chart plotting the total exeuction time for the amount of subscribers
total_time =
  VegaLite.new()
  |> VegaLite.mark(:line, point: [fill: "red"], stroke: "red")
  |> VegaLite.encode_field(:y, "total_time",
    type: :quantitative,
    title: "Total Time (μs)",
    axis: [title_color: "red"]
  )

# chart plotting the average time it takes 1 subscription
average_time =
  VegaLite.new()
  |> VegaLite.mark(:line, point: [fill: "green"], stroke: "green", axis: [title_color: "green"])
  |> VegaLite.encode_field(:y, "average_time",
    type: :quantitative,
    title: "Average per subscription (μs)",
    axis: [title_color: "green"]
  )

# create plots
VegaLite.new(width: 700)
|> VegaLite.encode_field(:x, "subscription_count", type: :quantitative)
|> VegaLite.data_from_values(subscribes_per_second)
|> VegaLite.layers([average_time, total_time])
|> VegaLite.resolve(:scale, y: :independent)
# chart plotting the average time it takes 1 subscription
average_time =
  VegaLite.new()
  |> VegaLite.mark(:line, point: [fill: "green"], stroke: "green", axis: [title_color: "green"])
  |> VegaLite.encode_field(:y, "rate",
    type: :quantitative,
    title: "Subscriptions per second",
    axis: [title_color: "green"]
  )

# create plots
VegaLite.new(width: 700)
|> VegaLite.encode_field(:x, "subscription_count", type: :quantitative)
|> VegaLite.data_from_values(subscribes_per_second)
|> VegaLite.layers([average_time])
|> VegaLite.resolve(:scale, y: :independent)

Profiling

require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  import Helpers
  import Examples


  subscriber_count = 50000

  # create the subscribers
  subscribers = do_n_times(fn -> create_subscriber() |> subscribe() end, subscriber_count)

  # profile the unsubscribe
  :eprof.start_profiling([Process.whereis(EventBroker.Registry)])

  # unsubscribe all of them and measure
  {μs, results} = measure(fn -> Enum.map(subscribers, &unsubscribe/1) end)

  # stop profiling
  :eprof.stop_profiling()

  # delete the subscribers again
  Enum.each(results, &terminate_subscriber/1)


  :eprof.analyze()
  μs
  """,
  file: __ENV__.file
)