Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poller.remove socket disposed fix #835

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3b8dbb9
added NetMQPollerTest.RemoveAndDisposeSocket() as a repro for issue #834
jasells Dec 3, 2019
d0f2725
Task refactor of .Remove(ISocketPollable) passes test!
jasells Dec 4, 2019
53e1833
speed up the test a little
jasells Dec 4, 2019
68429d7
async fix supporting .NET40, broke a test
jasells Dec 4, 2019
dea90f4
un-break the API back to sycnronous (but now working)
jasells Dec 8, 2019
6b344ca
rename new test to avoid confusion
jasells Dec 8, 2019
c879804
fixed RemoveAndDispose()
jasells Dec 8, 2019
54c2931
fix other Remove methods
jasells Dec 8, 2019
bc362a9
clean up the RUn method for .NET40 compat
jasells Dec 8, 2019
98a0f00
undo changes in RemoveThrowsIfSocketAlreadyDisposed
jasells Dec 8, 2019
a3819e1
fixing spacing complaints
jasells Dec 8, 2019
4f3b1e6
return task from tests
jasells Dec 21, 2019
33816cb
fixing wait() calls
jasells Dec 21, 2019
bda8b3f
use Task.Factory.StartNew
jasells Dec 21, 2019
cb16939
initial refactor works, except for one test.
jasells Feb 18, 2020
544bb94
fix Selector to handle disposed sockets
jasells Feb 18, 2020
87b7649
added comment about previous commmit
jasells Feb 18, 2020
92163b1
removed comment
jasells Feb 18, 2020
51457d1
fix region tag formatting
jasells Feb 18, 2020
fce2bb0
sync with master
jasells Feb 29, 2020
38910d8
updated comments
jasells Feb 29, 2020
3bacfb8
cleaer names for async socket remove tests
jasells Feb 29, 2020
455f6b0
I believe this should address the net45/net40 Task.FromResult() concern
jasells Feb 29, 2020
52fa20e
ensure tasks are queued on the poller, not the task pool.
jasells Mar 31, 2020
884d4ea
remove the dictionary checks.
jasells Mar 31, 2020
9f17258
fix indentation
jasells Mar 31, 2020
3ea0693
removed comment that needs seperate PR
jasells Apr 7, 2020
72ebdbc
added RemoveAsync(Socket ) overload
jasells Apr 24, 2020
c14f9f9
comment update
jasells Apr 24, 2020
0a767c6
added REmoveASync(timer) overload and test
jasells Apr 24, 2020
fa5e49b
fix depricated message
jasells Apr 24, 2020
83a34b6
refactored ContainsAsync() methods
jasells Apr 24, 2020
217e4f4
added doc block
jasells Apr 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 258 additions & 1 deletion src/NetMQ.Tests/NetMQPollerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ public void Monitoring()
int port = rep.BindRandomPort("tcp://127.0.0.1");

reqMonitor.Connected += (s, e) => connectedEvent.Set();
req.Connect("tcp://127.0.0.1:" + port);

reqMonitor.AttachToPoller(poller);

poller.RunAsync();

req.Connect("tcp://127.0.0.1:" + port);
req.SendFrame("a");

rep.SkipFrame();
Expand Down Expand Up @@ -347,6 +347,8 @@ public void RemoveSocket()
// identity
e.Socket.SkipFrame();

//**Note: bad to assert from worker thread!
// If it fails, the test will crash, not report failure!
Assert.Equal("Hello", e.Socket.ReceiveFrameString(out bool more));
Assert.False(more);

Expand Down Expand Up @@ -400,6 +402,151 @@ public void RemoveSocket()
}
}

[Fact]
public async Task RemoveAndDisposeSocketAsync()
{
//set up poller, start it
var patient = new NetMQPoller();
patient.RunAsync();

Assert.True(patient.IsRunning);

//create a pub-sub pair
var port = 55667;
var conn = $"tcp://127.0.0.1:{port}";

var pub = new PublisherSocket();
pub.Bind(conn);

var sub = new SubscriberSocket();
sub.Connect(conn);
sub.SubscribeToAnyTopic();

//handle callbacks from poller thread
sub.ReceiveReady += (s, e) =>
{
var msg = e.Socket.ReceiveFrameString();

Debug.WriteLine($"sub has data: {msg}");
};

//add the subscriber socket to poller
patient.Add(sub);

//set up pub on separate thread
var canceller = new CancellationTokenSource();

var pubAction = new Action(async () =>
{
var token = canceller.Token;

uint i = 0;

while (!token.IsCancellationRequested)
{
pub.SendFrame($"Hello-{++i}");

// send ~ 5Hz
await Task.Delay(200);
}
});

Task.Run(pubAction);

//allow a little time to run
await Task.Delay(2000);

//now try to remove the sub from poller
await patient.RemoveAndDisposeAsync(sub);

Assert.True(sub.IsDisposed);

//allow for poller to continue running
await Task.Delay(2000);

patient.Stop();
Assert.False(patient.IsRunning);

canceller.Cancel();

pub?.Dispose();
patient?.Dispose();
jasells marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public async Task DisposeSocketAfterAsyncRemoval()
{
//set up poller, start it
var patient = new NetMQPoller();
patient.RunAsync();

Assert.True(patient.IsRunning);

//create a pub-sub pair
var port = 55667;
var conn = $"tcp://127.0.0.1:{port}";

var pub = new PublisherSocket();
pub.Bind(conn);

var sub = new SubscriberSocket();
sub.Connect(conn);
sub.SubscribeToAnyTopic();

//handle callbacks from poller thread
sub.ReceiveReady += (s, e) =>
{
var msg = e.Socket.ReceiveFrameString();

Debug.WriteLine($"sub has data: {msg}");
};

//add the subscriber socket to poller
patient.Add(sub);

//set up pub on separate thread
var canceller = new CancellationTokenSource();

var pubAction = new Action(async () =>
{
var token = canceller.Token;

uint i = 0;

while(!token.IsCancellationRequested)
{
pub.SendFrame($"Hello-{++i}");

// send ~ 5Hz
await Task.Delay(200);
}
});

var pubThread = Task.Run(pubAction);
jasells marked this conversation as resolved.
Show resolved Hide resolved

//allow a little time to run
await Task.Delay(2000);

//now try to remove the sub from poller
await patient.RemoveAsync(sub);

// dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught!
sub.Dispose();
sub = null;
jasells marked this conversation as resolved.
Show resolved Hide resolved

//allow for poller to continue running
await Task.Delay(2000);

patient.Stop();
Assert.False(patient.IsRunning);

canceller.Cancel();

pub?.Dispose();
patient?.Dispose();
jasells marked this conversation as resolved.
Show resolved Hide resolved
canceller?.Dispose();
}

[Fact]
public void AddThrowsIfSocketAlreadyDisposed()
{
Expand Down Expand Up @@ -453,6 +600,7 @@ public void DisposeThrowsIfSocketAlreadyDisposed()
// Dispose the socket.
// It is incorrect to have a disposed socket in a poller.
// Disposed sockets can throw into the poller's thread.

socket.Dispose();

// Dispose throws if a polled socket is disposed
Expand Down Expand Up @@ -562,6 +710,61 @@ public void RemoveTimer()
}
}

[Fact]
public async void RemoveTimer_Async()
{
using (var router = new RouterSocket())
using (var dealer = new DealerSocket())
using (var poller = new NetMQPoller { router })
{
int port = router.BindRandomPort("tcp://127.0.0.1");

dealer.Connect("tcp://127.0.0.1:" + port);

bool timerTriggered = false;

var timer = new NetMQTimer(TimeSpan.FromMilliseconds(500));
timer.Elapsed += (s, a) => { timerTriggered = true; };

// The timer will fire after 100ms
poller.Add(timer);

bool messageArrived = false;

router.ReceiveReady += (s, e) =>
{
router.SkipFrame();
router.SkipFrame();
messageArrived = true;
//// Remove timer
//poller.Remove(timer);
};

poller.RunAsync();

Thread.Sleep(20);

dealer.SendFrame("hello");

Thread.Sleep(300);

try
{
await poller.RemoveAsync(timer);
}
catch (Exception ex)
{
//ensure no exceptions thrown
Assert.Null(ex);
}

poller.Stop();

Assert.True(messageArrived);
Assert.False(timerTriggered);
}
}

[Fact]
public void RunMultipleTimes()
{
Expand Down Expand Up @@ -859,6 +1062,60 @@ public void NativeSocket()
}
}

[Fact]
public async void NativeSocket_RemoveAsync()
{
using (var streamServer = new StreamSocket())
using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
int port = streamServer.BindRandomPort("tcp://*");

socket.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port));

var buffer = new byte[] { 1 };
socket.Send(buffer);

byte[] identity = streamServer.ReceiveFrameBytes();
byte[] message = streamServer.ReceiveFrameBytes();

Assert.Equal(buffer[0], message[0]);

var socketSignal = new ManualResetEvent(false);

using (var poller = new NetMQPoller())
{
poller.Add(socket, s =>
{
socket.Receive(buffer);

socketSignal.Set();
});

poller.RunAsync();

// no message is waiting for the socket so it should fail
Assert.False(socketSignal.WaitOne(100));

// sending a message back to the socket
streamServer.SendMoreFrame(identity).SendFrame("a");

Assert.True(socketSignal.WaitOne(100));

socketSignal.Reset();

await poller.RemoveAsync(socket);

// sending a message back to the socket
streamServer.SendMoreFrame(identity).SendFrame("a");

// we remove the native socket so it should fail
Assert.False(socketSignal.WaitOne(100));

poller.Stop();
}
}
}

#endregion

#region TaskScheduler tests
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/ISocketPollableCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NetMQ
/// <remarks>
/// This interface provides an abstraction over the legacy Poller and newer <see cref="NetMQPoller"/> classes for use in <see cref="NetMQMonitor"/>.
/// </remarks>
[Obsolete("Use INetMQPoller instead")]
[Obsolete("Use INetMQPoller instead. This should be made internal, to avoid major re-work of NetMQMonitor, but prevent accidental mis-use from applications")]
public interface ISocketPollableCollection
{
/// <summary>
Expand Down
18 changes: 18 additions & 0 deletions src/NetMQ/ISocketPollableCollectionAsync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace NetMQ
{
/// <summary>
///
/// </summary>
/// <remarks>
/// This interface provides an abstraction over the legacy Poller and newer <see cref="NetMQPoller"/> classes for use in <see cref="NetMQMonitor"/> and avoids thread sync issues removing sockets.
/// </remarks>
public interface ISocketPollableCollectionAsync
{
Task RemoveAsync([NotNull] ISocketPollable socket);
Task RemoveAndDisposeAsync<T>(T socket) where T : ISocketPollable, IDisposable;
}
}
39 changes: 39 additions & 0 deletions src/NetMQ/NetMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,43 @@
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>

<!--
<ItemGroup Condition=" '$(TargetFramework)' == 'monoandroid60' ">
<Reference Include="mscorlib" />
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.ServiceModel" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'xamarinios10' ">
<Reference Include="mscorlib" />
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.ServiceModel" />
</ItemGroup>
-->

<ItemGroup>
<PackageReference Include="SourceLink.Create.CommandLine" Version="2.7.4" PrivateAssets="All" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net47'">
<PackageReference Include="Microsoft.Bcl.Async">
<Version>1.0.168</Version>
</PackageReference>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net40'">
<PackageReference Include="Microsoft.Bcl.Async">
<Version>1.0.168</Version>
</PackageReference>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Bcl.Async">
<Version>1.0.168</Version>
</PackageReference>
</ItemGroup>

</Project>
Loading