As Staff Engineer on Lob’s Address Verification team, I was recently tasked with standing up an endpoint for customers that supplied them with all the data we had stored for a particular zip code. We use Elasticsearch pretty heavily to store our data so this boiled down to querying ES by zip code and writing the data to Amazon S3. But both services have subtle limitations on how you read/write them: ES caps your result set to 10,000 records, and Amazon S3 insists that you write to your buckets in chunks of at least 4 Megabytes. This turned out to be a perfect use case for Elixir streams!
Before we dive into the nitty gritty of Elasticsearch and AWS S3, let’s first do a brief refresher on Elixir Streams.
What is a Stream? A Stream is a composable enumerable that is computed lazily. This is typically accomplished by maintaining a little bit of state that describes where you currently are in your enumerable and a method of computing the next item. A trivial example would be a Stream of natural numbers:
Stream.iterate(0, fn x -> x + 1 end)
The gory details of how this is implemented under the hood is the topic for another day. You rarely need to get that low level — the Stream library has a wealth of functions similar to Enum that you can use as building blocks to construct your own Streams. And several Elixir library functions natively return streams: IO.stream will turn a file (or other source) into a Stream allowing you to process very large files without having to read them into memory first in their entirety. The lazy aspect of Streams means that the minimum amount of the Stream is computed that’s needed to generate your answer.
Imagine we want to find the 100th natural number that is palindromic and also divisible by 3:
1
|> Stream.iterate(fn n -> n + 1 end)
|> Stream.filter(fn n -> s = Integer.to_string(n); s == String.reverse(s) end)
|> Stream.filter(fn n -> rem(n, 3) == 0 end)
|> Enum.at(99)
What I find elegant (and efficient) about this solution, is that no number beyond the answer, 20202, will be computed. Neat!
If you’d like to dive in deeper, there’s a great Getting Started section on Enumerables and Streams on the Elixir language website. The Elixir API documentation is also chock full of instructive examples.
Now onto Elasticsearch. The first thing to know is that ES caps the size of the result set at 10k items. After scrutinizing the documentation, it became clear that the original method that popped up on my Stack Overflow (Scroll API) had been deprecated and the new approved method was to use PointInTime IDs. PIT IDs essentially give you a snapshot of your index at one point in time so that you get a consistent view of your search results across multiple queries. The process is:
acquire a PIT for your index
perform your queries using the PIT (without the index — it’s now implicit in the PID)
delete the PIT
That last step is crucial — PITs are extremely resource hungry so it is vital that you delete them as soon as you’re done. I borked our staging ES in my early experiments with PITs because I wasn’t deleting them and the expiration parameter seemed to have no effect.
The other thing we’d like to do is abstract away the underlying PIT and paging mechanism for the end user so we provide an interface that just takes an ES query and generates a Stream of hits. Let’s start by creating and deleting the PID.
Creating the PID is just a matter of requesting a PID associated with the index that we’re interested in running queries against.
@spec create(ES.index_t()) :: {:ok, pit_t()} | {:error, ES.response_t()}
def create(index) do
:post
|> HTTP.request(url(index), HTTP.no_payload(), HTTP.json_encoding())
|> HTTP.on_200(& &1["id"])
end
Deleting a PID is a simple HTTP delete except that the PIDs are huge so you need to supply them in the body and not as URL params. Some HTTP libraries won’t let you do this. The HTTP spec is a little muddy on the matter and last I checked on Stack Overflow there was a healthy debate on the subject. This is why we use HTTPoison — it allows payloads on DELETE requests.
@spec delete(pit_t()) :: :ok | {:error, HTTPoison.AsyncResponse | HTTPoison.MaybeRedirect | HTTPoison.Response}
def delete(pit) do
url = HTTP.generate_url("_pit", ES.no_index(), HTTP.no_options())
with {:ok, payload} <- Poison.encode(%{"id" => pit}),
%HTTPoison.Response{status_code: 200} <-
HTTPoison.request!(:delete, url, payload, HTTP.headers(HTTP.json_encoding())) do
:ok
else
error -> {:error, error}
end
end
Now that the PIDs are sorted out, our next order of business is figuring out how to leverage them in our queries. Our basic query is enhanced with three extra parameters:
size of 10,000 (the maximum allowed by elastic search)
pit, a hash that contains %{id: <id>, keep_alive: “1m”}
sort, a hash that contains %{_shard_doc: “desc”}
(For sort, you need to provide something and _shard_doc is baked into every Elastic search index so that’s nice and portable.)
@spec initial(PIT.pit_t(), ES.query_t()) :: ES.query_t()
def initial(pit, query) do
%{
query: query,
pit: %{id: pit, keep_alive: PIT.expiration()},
size: ES.max_results(),
sort: %{"_shard_doc" => "desc"}
}
end
With our basic query down, we can focus on how to generate the sequence of queries that will extract all the items that satisfy our search criteria. The trick here is that we feed into the next query the value of our sort field from the last hit of the previous result, like so:
@spec update(ES.query_t(), ES.response_t()) :: ES.query_t()
def update(query, %Req.Response{body: %{"hits" => %{"hits" => hits}}, status: 200}) do
Map.put(query, :search_after, List.last(hits)["sort"])
end
Armed with these two query-generating functions, we can code up Stream.iterator that will pull all the results out of our desired index:
@spec streamer(ES.query_t()) :: {:ok, Enumerable.t()} | {:error, any}
def streamer(initial) do
{:ok,
initial
|> search()
|> Stream.iterate(fn previous -> search(Query.update(initial, previous)) end)
}
rescue
error -> {:error, error}
end
We need a high-level function that takes the results from this function and produces a Stream of hits. But there’s a wrinkle — we have to delete the PIT when we are done with the Stream. But how will we know? The solution is to pass in a consumer that consumes the Stream and then we delete the PIT afterward, like so:
@spec stream_many(ES.query_t(), ES.index_t(), ES.consumer_t(), non_neg_integer()) :: {:error, any()} | {:ok, Enumerable.t()}
def stream_many(query, index, consumer, count) do
case PIT.create(index) do
{:ok, pit_id} ->
try do
case pit_id |> Query.initial(query) |> streamer() do
{:ok, stream} ->
stream
|> Stream.flat_map(& &1.body["hits"]["hits"])
|> Stream.take(count)
|> then(consumer)
error -> error
end
after
PIT.delete(pit_id)
end
error -> error
end
rescue
error -> {:error, error}
end
But if our query returns less than 10k results, we don’t need the whole PID/sort/search_after machinery — one query will do the trick.
@spec stream_one(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream_one(query, index, consumer) do
query
|> search(index)
|> HTTP.on_200(fn body -> consumer.(body["hits"]["hits"]) end)
end
Now we need a top-level function that query the size of the result and chooses between stream_one or stream_many depending on the value, like so:
@spec stream(ES.query_t(), ES.index_t(), ES.consumer_t()) :: {:ok, any()} | {:error, any()}
def stream(query, index, consumer) do
case count(query, index) do
{:ok, count} ->
if count > ES.max_results() do
stream_many(query, index, consumer, count)
else
stream_one(query, index, consumer)
end
error ->
error
end
end
The library we use to access Amazon S3 is ex_aws_s3. It handles all the low-level details for us but it does have one requirement: the input data stream must be chunks of at least 4 Mbytes.
To accomplish this, we use the Stream function chunk_while. This takes four inputs:
the enumerable to be chunked
the initial value of the accumulator
the step function
a finalizer function
Our first decision is, what should the accumulator look like that gets passed forward by the step function? It obviously should contain the list of the items in the current chunk. But more subtly, it should also contain the size of the current chunk so we don’t waste resources recomputing it each time. So that gives us a two-element tuple containing a list and an integer.
Next, we turn our attention to the step function. It should check if the current size is greater than or equal to the desired chunk size. If it is, we should take the list of items from the accumulator and convert them into a chunk using convert; if not, we should add it to the current chunk (and update the size) using add_chunk.
What should add_chunk do? Just push the item onto the front of a list and increase the value of size by the current chunk’s size.
The behaviour of convert depends on whether we care about the order of the items in the chunk being preserved in the output because the items in the list will be in reverse order so need to be reversed. But if we don’t care, we can skip that transformation. Putting this all together gives us:
@spec chunker(Enumerable.t(), non_neg_integer(), boolean(), (any() -> non_neg_integer()), (Enumerable.t() -> any())) :: Enumerable.t()
def chunker(
chunks,
chunk_size,
ordered \\ true,
sizer \\ &String.length/1,
joiner \\ &Enum.join/1
) do
zero = {0, []}
convert =
if ordered do
fn chunks -> chunks |> Enum.reverse() |> then(joiner) end
else
joiner
end
final = fn {_, chunks} -> {:cont, convert.(chunks), zero} end
add_chunk = fn {size, chunks}, chunk -> {size + sizer.(chunk), [chunk | chunks]} end
step = fn chunk, accum = {size, chunks} ->
if size >= chunk_size do
{:cont, convert.(chunks), add_chunk.(zero, chunk)}
else
{:cont, add_chunk.(accum, chunk)}
end
end
Stream.chunk_while(chunks, zero, step, final)
end
We can now combine all the components we have written to produce a simple controller action that requests an output stream of hits from our Elasticsearch section into 4M chunks that will satisfy the requirement of the ExAWS.S3 module:
defmodule Controller do
@four_megabytes 4*1024*1024
def find_all(params) do
query(params) |> ElasticSearch.stream(params.index, &consumer(params, &1))
end
def query(params) do
%{term: %{params.query_term => params.query_value}}
end
def consumer(params, stream) do
stream
|> Stream.map(&Poison.encode!(&1["_source"]))
|> chunker(@four_megabytes, false)
|> S3.upload(params.bucket, params.filename)
|> ExAws.request()
end
end
And that wraps up our exploration of Elixir streams, Elasticsearch, and AWS S3. I hope you enjoyed it!
By Lob Staff Engineer, Guy Argo.