Skip to content

Commit

Permalink
Add test for events just after subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyvanriet authored and svrdlans committed Jan 31, 2024
1 parent 01cc4d1 commit 712e27c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ config :ex_unit,

config :extreme, TestConn,
db_type: "node",
host: "localhost",
host: System.get_env("EVENTSTORE_HOST") || "localhost",
port: "1113",
username: "admin",
password: "changeit",
Expand Down
3 changes: 3 additions & 0 deletions lib/extreme/request_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ defmodule Extreme.RequestManager do

{:ok, subscription} = fun.(correlation_id)

# TODO this guarantees several stream events will appear before the registration
# need to find a way to accomplish this from the test itself.
Process.sleep(50)
GenServer.cast(req_manager, {:register_subscription, correlation_id, subscription})

GenServer.reply(from, {:ok, subscription})
Expand Down
80 changes: 73 additions & 7 deletions test/subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ defmodule ExtremeSubscriptionsTest do
end

def handle_call({:on_event, event} = message, _from, state) do
IO.puts("Subscriber {:on_event, event} (#{inspect(message)})")
send(state.sender, message)
{:reply, :ok, %{state | received: [event | state.received]}}
end

def handle_call({:on_event, event, _correlation_id} = message, _from, state) do
IO.puts("Subscriber {:on_event, event, _correlation_id} (#{inspect(message)})")
send(state.sender, message)
{:reply, :ok, %{state | received: [event | state.received]}}
end
Expand Down Expand Up @@ -526,9 +528,9 @@ defmodule ExtremeSubscriptionsTest do
Helpers.assert_no_leaks(TestConn)
end

test "events written while reading stream are also pushed to client in correct order" do
test "events written while subscribing are also pushed to client in correct order" do
stream = Helpers.random_stream_name()
num_events = 1_000
num_events = 200
# prepopulate stream
events1 =
1..num_events
Expand All @@ -538,21 +540,85 @@ defmodule ExtremeSubscriptionsTest do
1..num_events
|> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x + num_events}"} end)

{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, events1))
Enum.each(events1, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} = TestConn.execute(Helpers.write_events(stream, [e]))
end)

# bombard the stream with individual event writes in the background
spawn(fn ->
Enum.each(events2, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, [e]))
end)

Logger.debug("Second pack of events written")
end)

# subscribe to existing stream
{:ok, subscriber} = Subscriber.start_link()

{:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2)

Logger.debug("Second pack of events written")

# assert first events are received
for _ <- 1..num_events, do: assert_receive({:on_event, _event})

Logger.debug("First pack of events received")

# assert second pack of events is received as well
for _ <- 1..num_events, do: assert_receive({:on_event, _event}, 1_000)

# assert :caught_up is received when existing events are read
assert_receive :caught_up

# check if events came in correct order.
assert Subscriber.received_events(subscriber) == events1 ++ events2

{:ok, %ExMsg.ReadStreamEventsCompleted{} = response} =
TestConn.execute(Helpers.read_events(stream, 0, 2_000))

assert events1 ++ events2 ==
Enum.map(response.events, fn event -> :erlang.binary_to_term(event.event.data) end)

Helpers.unsubscribe(TestConn, subscription)
end

test "events written just after subscription starts are also pushed to client in correct order" do
stream = Helpers.random_stream_name()
num_events = 200
# prepopulate stream
events1 =
1..num_events
|> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x}"} end)

events2 =
1..num_events
|> Enum.map(fn x -> %Event.PersonCreated{name: "Name #{x + num_events}"} end)

Enum.each(events1, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} = TestConn.execute(Helpers.write_events(stream, [e]))
end)

spawn(fn ->
{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, events2))
Enum.each(events2, fn e ->
{:ok, %ExMsg.WriteEventsCompleted{}} =
TestConn.execute(Helpers.write_events(stream, [e]))
end)

Logger.debug("Second pack of events written")
end)

Process.sleep(100)

# subscribe to existing stream
{:ok, subscriber} = Subscriber.start_link()

# TODO make it more likely or guaranteed that the race would fail

{:ok, subscription} = TestConn.read_and_stay_subscribed(stream, subscriber, 0, 2)

Logger.debug("Second pack of events written")

# assert first events are received
for _ <- 1..num_events, do: assert_receive({:on_event, _event})

Expand Down

0 comments on commit 712e27c

Please sign in to comment.