Elixir
Support for Elixir is provided through fluvio-ex, a community supported project by @viniarck.
This will create a topic lobby
with 1 partition and 1 replica.
alias Fluvio.Admin
{:ok, pid} = Admin.start_link()
{:ok, _} = Admin.create_topic(pid, "lobby", %{partitions: 1, replication: 1})
In this example, a Producer for the topic lobby
is created. Then the message hello
is sent to the topic. Also, twenty values (1
to 20
) are sent asynchronously in chunks of 10.
alias Fluvio.Producer
{:ok, pid} = Producer.start_link(%{topic: "lobby"})
{:ok, _} = Producer.send(pid, "hello")
{:ok, _} = Producer.flush(pid)
[] =
1..20
|> Stream.chunk_every(10)
|> Stream.flat_map(fn chunk ->
[
chunk
|> Enum.map(fn value ->
Task.async(fn -> {Producer.send(pid, to_string(value)), value} end)
end)
|> Task.await_many()
|> Enum.filter(&match?({{:error, _msg}, _value}, &1)),
[{Producer.flush(pid), :flush}]
|> Enum.filter(&match?({{:error, _msg}, _value}, &1))
]
end)
|> Stream.concat()
|> Enum.to_list()
In this example, a Consumer for the topic lobby
is created, starting from offset 0
. As records are received, the record contents are printed with IO.inspect
.
alias Fluvio.Consumer
{:ok, pid} = Consumer.start_link(%{topic: "lobby", offset: [from_beginning: 0]})
Consumer.stream_each(pid, fn result ->
case result do
{:ok, record} -> IO.inspect(record)
{:error, msg} -> IO.inspect("Error: #{msg}")
end
end)