Using GenStage for a Batching Pipeline
In developing a new app for data loss prevention (DLP) integration in Slack. I came across a problem where GenStage is the solution.
GenStage TLDR;
GenStage is a behavior that allows you to create a data transformation pipeline that has back-pressure between stages. This allows to design a system that has a mechanism for dealing with demand so it doesn't overload itself.
The Problem
The app relies on an external service to inspect content (Slack messages and files) for sensitive information. However, this service imposes some limits to a consumer by only allowing 10 requests per second and only allowing 100 items inspected in a single request. To maximize my throughput, I need match those limits to analyze 1,000 Slack messages per second.
GenStage provides the tools to solve this problem seamlessly.
The Solution
I've designed the pipeline with 3 stages:
-
Stage 1 (The Collector) - The Collector is a stage responsible for producing events to be consumed later in the pipeline making it a producer in GenStage terminology. This stage is ultimately just a queue. Each item in the queue contains some metadata related to a Slack message or file which I called
Event
. -
Stage 2 (The Inspector) - The Inspector stage acts as a producer/consumer by demanding a maximum of 100 events from the Collector every 100ms and transforming it for the next stage. The list of events comes in as a batch and is processed as batch. Information from each event is map-reduced into a single request that is sent to an external service for inspection. The request will come back with information about each event. The events and the results from inspection are zipped and sent off to be processed by the last stage.
-
Stage 3 (Reporters) - The Reporters stage is the final stage in the pipeline which makes it a consumer. This stage takes advantage of the ConsumerSupervisor behavior from GenStage. The ConsumerSupervisor behavior makes it easy to create a "worker pool" where a new process is spawned for each event coming from the previous stage. This stage just simply creates alerts and posts them in Slack.
Code
Here's actual code from the pipeline with some domain processing left out.
- The Collector
defmodule CrimsonAegis.Batcher.Collector do
@moduledoc """
Buffers Slack events in a FIFO queue to be consumed by later stage with GenStage.
This process is a Producer for this GenStage pipeline.
[**Collector**] <- Inspector <- Reporter
"""
use GenStage
alias CrimsonAegis.Batcher.Queue
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:producer, Queue.new()}
end
# Gives events for the next stage to process when requested
def handle_demand(demand, queue) when demand > 0 do
{items, updated_queue} = Queue.take(queue, demand)
{:noreply, items, updated_queue}
end
# Adds an event
def handle_cast({:push, event}, queue) do
updated_queue = Queue.enqueue(queue, event)
{:noreply, [], updated_queue}
end
@doc """
Adds an event to the buffer queue.
"""
def add(event) do
GenStage.cast(__MODULE__, {:push, event})
end
end
- The Inspector
defmodule CrimsonAegis.Batcher.Inspector do
@moduledoc """
Takes a batch of items periodically to be inspected and sends them off in a single request.
In production, there are 10 of this processes. Each one makes asks for demand every second. Each one is offset by 100ms. This ensures to account for latency for a single request.
This stage acts as a Producer-Consumer in this GenStage pipeline
Collector <- [**Inspector**] <- Reporter
"""
use GenStage
alias CrimsonAegis.Batcher.Event
def start_link(args) do
GenStage.start_link(__MODULE__, args, name: args[:name])
end
def init(args) do
# Demand size when asking
batch_size = args[:batch_size] ||
# Delay the first ask
sync_offset = args[:sync_offset] || 0
# Ask for events on a given interval
interval = args[:inspect_interval]
Process.send_after(self(), :ask, sync_offset)
state = %{batch_size: batch_size, interval: interval}
# Subscribe to the Collector as a producer/consumer
{:producer_consumer, state, subscribe_to: [CrimsonAegis.Batcher.Collector]}
end
# Set the subscription to manual to control when to ask for events
def handle_subscribe(:producer, _opts, from, state) do
{:manual, Map.put(state, :producer, from)}
end
# Make the subscriptions to auto for consumers
def handle_subscribe(:consumer, _, _, state) do
{:automatic, state}
end
# Process the events coming from the producer
def handle_events(events, _, state) do
items = Enum.map(events, &transform_event(&1))
# The results are an array of maps that can be operated on
results = DLP.inspect(items)
{:noreply, results, state}
end
# Requests a certain amount of items to process on a set interval
def handle_info(:ask, %{batch_size: batch_size, interval: interval, producer: producer} = state) do
# Request a batch of events with a max batch size
GenStage.ask(producer, batch_size)
# Schedule the next request
Process.send_after(self(), :ask, interval)
{:noreply, [], state}
end
# Transforms the event into meaningful data for a request
defp transform_event(%Event{} = event) do
# ...
end
end
- The Reporter
defmodule CrimsonAegis.Batcher.ReporterSupervisor do
@moduledoc """
Supervisor for Reporter pool.
"""
use ConsumerSupervisor
def start_link(args) do
ConsumerSupervisor.start_link(__MODULE__, args)
end
def init(args) do
children = [
worker(CrimsonAegis.Batcher.Reporter, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: args[:producers]}
end
end
defmodule CrimsonAegis.Batcher.Reporter do
@moduledoc """
Takes an event and its findings and handles it accordingly by creating appropriate alerts in Slack.
A new process is spun up every time an event needs to be handled.
This process acts as a Consumer in this GenStage pipeline.
Collector <- Inspector <- [**Reporter**]
"""
def start_link({event, %{"findings" => findings}}) do
Task.start_link(fn ->
findings
|> some_biz_logic
|> CrimsonAegis.Slack.Utils.create_alert_for_findings(event.event, event.event_type)
end)
end
end
Wrap Up
Elixir's GenStage provides a nice toolset to create a processing pipeline that can easily include batch processing with very little coding. Be sure to read the GenStage docs for more information.