Using GenStage for a Batching Pipeline

In developing a new app for data loss prevention (DLP) integration in Slack. I came across a problem where GenStage is the solution.

GenStage TLDR;

GenStage is a behavior that allows you to create a data transformation pipeline that has back-pressure between stages. This allows to design a system that has a mechanism for dealing with demand so it doesn't overload itself.

The Problem

The app relies on an external service to inspect content (Slack messages and files) for sensitive information. However, this service imposes some limits to a consumer by only allowing 10 requests per second and only allowing 100 items inspected in a single request. To maximize my throughput, I need match those limits to analyze 1,000 Slack messages per second.

GenStage provides the tools to solve this problem seamlessly.

The Solution

GenStage Pipeline
GenStage Pipeline

I've designed the pipeline with 3 stages:

  • Stage 1 (The Collector) - The Collector is a stage responsible for producing events to be consumed later in the pipeline making it a producer in GenStage terminology. This stage is ultimately just a queue. Each item in the queue contains some metadata related to a Slack message or file which I called Event.

  • Stage 2 (The Inspector) - The Inspector stage acts as a producer/consumer by demanding a maximum of 100 events from the Collector every 100ms and transforming it for the next stage. The list of events comes in as a batch and is processed as batch. Information from each event is map-reduced into a single request that is sent to an external service for inspection. The request will come back with information about each event. The events and the results from inspection are zipped and sent off to be processed by the last stage.

  • Stage 3 (Reporters) - The Reporters stage is the final stage in the pipeline which makes it a consumer. This stage takes advantage of the ConsumerSupervisor behavior from GenStage. The ConsumerSupervisor behavior makes it easy to create a "worker pool" where a new process is spawned for each event coming from the previous stage. This stage just simply creates alerts and posts them in Slack.

Code

Here's actual code from the pipeline with some domain processing left out.

  • The Collector
defmodule CrimsonAegis.Batcher.Collector do
  @moduledoc """
  Buffers Slack events in a FIFO queue to be consumed by later stage with GenStage.

  This process is a Producer for this GenStage pipeline.

  [**Collector**] <- Inspector <- Reporter
  """

  use GenStage
  
  alias CrimsonAegis.Batcher.Queue

  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer, Queue.new()}
  end

  # Gives events for the next stage to process when requested
  def handle_demand(demand, queue) when demand > 0 do
    {items, updated_queue} = Queue.take(queue, demand)

    {:noreply, items, updated_queue}
  end

  # Adds an event
  def handle_cast({:push, event}, queue) do
    updated_queue = Queue.enqueue(queue, event)

    {:noreply, [], updated_queue}
  end

  @doc """
  Adds an event to the buffer queue.
  """
  def add(event) do
    GenStage.cast(__MODULE__, {:push, event})
  end
end
  • The Inspector
defmodule CrimsonAegis.Batcher.Inspector do
  @moduledoc """
  Takes a batch of items periodically to be inspected and sends them off in a single request.

  In production, there are 10 of this processes. Each one makes asks for demand every second. Each one is offset by 100ms. This ensures to account for latency for a single request.

  This stage acts as a Producer-Consumer in this GenStage pipeline

  Collector <- [**Inspector**] <- Reporter
  """
  use GenStage

  alias CrimsonAegis.Batcher.Event

  def start_link(args) do
    GenStage.start_link(__MODULE__, args, name: args[:name])
  end

  def init(args) do
    # Demand size when asking
    batch_size = args[:batch_size] || 
    # Delay the first ask
    sync_offset = args[:sync_offset] || 0
    # Ask for events on a given interval
    interval = args[:inspect_interval]

    Process.send_after(self(), :ask, sync_offset)

    state = %{batch_size: batch_size, interval: interval}

    # Subscribe to the Collector as a producer/consumer
    {:producer_consumer, state, subscribe_to: [CrimsonAegis.Batcher.Collector]}
  end

  # Set the subscription to manual to control when to ask for events
  def handle_subscribe(:producer, _opts, from, state) do
    {:manual, Map.put(state, :producer, from)}
  end

  # Make the subscriptions to auto for consumers
  def handle_subscribe(:consumer, _, _, state) do
    {:automatic, state}
  end

  # Process the events coming from the producer
  def handle_events(events, _, state) do
    items = Enum.map(events, &transform_event(&1))

    # The results are an array of maps that can be operated on
    results = DLP.inspect(items)

    {:noreply, results, state}
  end

  # Requests a certain amount of items to process on a set interval
  def handle_info(:ask, %{batch_size: batch_size, interval: interval, producer: producer} = state) do
    # Request a batch of events with a max batch size
    GenStage.ask(producer, batch_size)

    # Schedule the next request
    Process.send_after(self(), :ask, interval)

    {:noreply, [], state}
  end

  # Transforms the event into meaningful data for a request
  defp transform_event(%Event{} = event) do
    # ...
  end
end
  • The Reporter
defmodule CrimsonAegis.Batcher.ReporterSupervisor do
  @moduledoc """
  Supervisor for Reporter pool.
  """
  use ConsumerSupervisor

  def start_link(args) do
    ConsumerSupervisor.start_link(__MODULE__, args)
  end

  def init(args) do
    children = [
      worker(CrimsonAegis.Batcher.Reporter, [], restart: :temporary)
    ]

    {:ok, children, strategy: :one_for_one, subscribe_to: args[:producers]}
  end
end

defmodule CrimsonAegis.Batcher.Reporter do
  @moduledoc """
  Takes an event and its findings and handles it accordingly by creating appropriate alerts in Slack.
  A new process is spun up every time an event needs to be handled.

  This process acts as a Consumer in this GenStage pipeline.

  Collector <- Inspector <- [**Reporter**]
  """

  def start_link({event, %{"findings" => findings}}) do
    Task.start_link(fn ->
      findings
      |> some_biz_logic
      |> CrimsonAegis.Slack.Utils.create_alert_for_findings(event.event, event.event_type)
    end)
  end
end

Wrap Up

Elixir's GenStage provides a nice toolset to create a processing pipeline that can easily include batch processing with very little coding. Be sure to read the GenStage docs for more information.

#elixir   •   #gen_stage