From 3b8dbb929f76046852c5eb88b5869f0fed3fdbe5 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 3 Dec 2019 17:02:36 -0600 Subject: [PATCH 01/32] added NetMQPollerTest.RemoveAndDisposeSocket() as a repro for issue #834 --- src/NetMQ.Tests/NetMQPollerTest.cs | 76 ++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 059e45993..005b93f75 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -400,6 +400,82 @@ public void RemoveSocket() } } + [Fact] + public async void RemoveAndDisposeSocket() + { + //set up poller, start it + var patient = new NetMQPoller(); + patient.RunAsync(); + + Assert.True(patient.IsRunning); + + //create a pub-sub pair + var port = 55667; + var conn = $"tcp://127.0.0.1:{port}"; + + var pub = new PublisherSocket(); + pub.Bind(conn); + + var sub = new SubscriberSocket(); + sub.Connect(conn); + sub.SubscribeToAnyTopic(); + + //handle callbacks from poller thread + sub.ReceiveReady += (s, e) => + { + var msg = e.Socket.ReceiveFrameString(); + + Debug.WriteLine($"sub has data: {msg}"); + }; + + //add the subscriber socket to poller + patient.Add(sub); + + //set up pub on separate thread + var canceller = new CancellationTokenSource(); + + var pubAction = new Action(async () => + { + var token = canceller.Token; + + uint i = 0; + + while(!token.IsCancellationRequested) + { + pub.SendFrame($"Hello-{++i}"); + + // send ~ 5Hz + await Task.Delay(200); + } + }); + + //var pubThread = new Task(pubAction, canceller.Token, TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent); + //pubThread.Start(); + + var pubThread = Task.Run(pubAction); + + //allow a little time to run + await Task.Delay(5000); + + //now try to remove the sub from poller + patient.Remove(sub); + + // dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught! + sub.Dispose(); + + //allow for poller to continue running + await Task.Delay(3000); + + patient.Stop(); + Assert.False(patient.IsRunning); + + canceller.Cancel(); + + //patient?.Dispose(); + pub?.Dispose(); + sub?.Dispose(); + } + [Fact] public void AddThrowsIfSocketAlreadyDisposed() { From d0f27255890c6ad10a089f3a52c6230e6b655d98 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Wed, 4 Dec 2019 11:40:36 -0600 Subject: [PATCH 02/32] Task refactor of .Remove(ISocketPollable) passes test! --- src/NetMQ.Tests/NetMQPollerTest.cs | 11 ++++++----- src/NetMQ/ISocketPollableCollection.cs | 3 ++- src/NetMQ/NetMQPoller.cs | 22 ++++++++++++++++++---- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 005b93f75..4b83aae40 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -458,10 +458,11 @@ public async void RemoveAndDisposeSocket() await Task.Delay(5000); //now try to remove the sub from poller - patient.Remove(sub); + await patient.Remove(sub); // dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught! sub.Dispose(); + sub = null; //allow for poller to continue running await Task.Delay(3000); @@ -471,9 +472,9 @@ public async void RemoveAndDisposeSocket() canceller.Cancel(); - //patient?.Dispose(); pub?.Dispose(); - sub?.Dispose(); + sub?.Dispose();// left here for dev testing when prior .Dispose() call may be commented out + patient?.Dispose(); } [Fact] @@ -499,7 +500,7 @@ public void AddThrowsIfSocketAlreadyDisposed() } [Fact] - public void RemoveThrowsIfSocketAlreadyDisposed() + public async void RemoveThrowsIfSocketAlreadyDisposed() { var socket = new RouterSocket(); @@ -511,7 +512,7 @@ public void RemoveThrowsIfSocketAlreadyDisposed() socket.Dispose(); // Remove throws if the removed socket - var ex = Assert.Throws(() => poller.Remove(socket)); + var ex = await Assert.ThrowsAsync(() => poller.Remove(socket)); Assert.StartsWith("Must not be disposed.", ex.Message); Assert.Equal("socket", ex.ParamName); diff --git a/src/NetMQ/ISocketPollableCollection.cs b/src/NetMQ/ISocketPollableCollection.cs index 0a5b45743..eba071386 100644 --- a/src/NetMQ/ISocketPollableCollection.cs +++ b/src/NetMQ/ISocketPollableCollection.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using JetBrains.Annotations; using NetMQ.Monitoring; @@ -13,7 +14,7 @@ namespace NetMQ public interface ISocketPollableCollection { void Add([NotNull] ISocketPollable socket); - void Remove([NotNull] ISocketPollable socket); + Task Remove([NotNull] ISocketPollable socket); void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable; } } \ No newline at end of file diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 620ec4fed..fe166eb88 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Net.Sockets; using System.Threading; +using System.Threading.Tasks; using JetBrains.Annotations; using NetMQ.Core.Utils; #if !NET35 @@ -112,12 +113,25 @@ protected override void QueueTask(Task task) m_tasksQueue.Enqueue(task); } - public void Run([NotNull] Action action) + public Task Run([NotNull] Action action) { + Task t = new Task(action); + if (!IsRunning || CanExecuteTaskInline) + { action(); + + var tcs = new TaskCompletionSource(); + tcs.SetResult(null); + + t = tcs.Task; + } else - new Task(action).Start(this); + { + t.Start(this); + } + + return t; } #else private void Run(Action action) @@ -221,7 +235,7 @@ public void Add([NotNull] Socket socket, [NotNull] Action callback) }); } - public void Remove(ISocketPollable socket) + public Task Remove(ISocketPollable socket) { if (socket == null) throw new ArgumentNullException(nameof(socket)); @@ -229,7 +243,7 @@ public void Remove(ISocketPollable socket) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - Run(() => + return Run(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) From 53e1833a44ad86e1f8e6cdbd92d6590c22523a57 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Wed, 4 Dec 2019 11:58:58 -0600 Subject: [PATCH 03/32] speed up the test a little --- src/NetMQ.Tests/NetMQPollerTest.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 4b83aae40..4fc4744be 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -455,7 +455,7 @@ public async void RemoveAndDisposeSocket() var pubThread = Task.Run(pubAction); //allow a little time to run - await Task.Delay(5000); + await Task.Delay(2000); //now try to remove the sub from poller await patient.Remove(sub); @@ -465,7 +465,7 @@ public async void RemoveAndDisposeSocket() sub = null; //allow for poller to continue running - await Task.Delay(3000); + await Task.Delay(2000); patient.Stop(); Assert.False(patient.IsRunning); From 68429d7a819b2cbe9c59f65a0c09be6df5845532 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Wed, 4 Dec 2019 16:26:10 -0600 Subject: [PATCH 04/32] async fix supporting .NET40, broke a test --- src/NetMQ.Tests/NetMQPollerTest.cs | 17 ++++++++++++++--- src/NetMQ/ISocketPollableCollection.cs | 2 +- src/NetMQ/NetMQ.csproj | 18 ++++++++++++++++++ src/NetMQ/NetMQPoller.cs | 4 ++-- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 4fc4744be..0de9aa0a6 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -458,7 +458,7 @@ public async void RemoveAndDisposeSocket() await Task.Delay(2000); //now try to remove the sub from poller - await patient.Remove(sub); + patient.Remove(sub); // dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught! sub.Dispose(); @@ -500,7 +500,7 @@ public void AddThrowsIfSocketAlreadyDisposed() } [Fact] - public async void RemoveThrowsIfSocketAlreadyDisposed() + public void RemoveThrowsIfSocketAlreadyDisposed() { var socket = new RouterSocket(); @@ -512,8 +512,19 @@ public async void RemoveThrowsIfSocketAlreadyDisposed() socket.Dispose(); // Remove throws if the removed socket - var ex = await Assert.ThrowsAsync(() => poller.Remove(socket)); + //var ex = Assert.Throws(() => poller.Remove(socket)); + ArgumentException ex = null; + + try + { + poller.Remove(socket); + } + catch (ArgumentException argEx) + { + ex = argEx; + } + Assert.NotNull(ex); Assert.StartsWith("Must not be disposed.", ex.Message); Assert.Equal("socket", ex.ParamName); diff --git a/src/NetMQ/ISocketPollableCollection.cs b/src/NetMQ/ISocketPollableCollection.cs index eba071386..18e12689a 100644 --- a/src/NetMQ/ISocketPollableCollection.cs +++ b/src/NetMQ/ISocketPollableCollection.cs @@ -14,7 +14,7 @@ namespace NetMQ public interface ISocketPollableCollection { void Add([NotNull] ISocketPollable socket); - Task Remove([NotNull] ISocketPollable socket); + void Remove([NotNull] ISocketPollable socket); void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable; } } \ No newline at end of file diff --git a/src/NetMQ/NetMQ.csproj b/src/NetMQ/NetMQ.csproj index 16e5a837f..d5bbac897 100644 --- a/src/NetMQ/NetMQ.csproj +++ b/src/NetMQ/NetMQ.csproj @@ -130,4 +130,22 @@ + + + 1.0.168 + + + + + + 1.0.168 + + + + + + 1.0.168 + + + diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index fe166eb88..514d9cf8c 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -235,7 +235,7 @@ public void Add([NotNull] Socket socket, [NotNull] Action callback) }); } - public Task Remove(ISocketPollable socket) + public async void Remove(ISocketPollable socket) { if (socket == null) throw new ArgumentNullException(nameof(socket)); @@ -243,7 +243,7 @@ public Task Remove(ISocketPollable socket) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - return Run(() => + await Run(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) From dea90f4e1d276a154bed513a42be0e54e1b0a152 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 7 Dec 2019 21:02:09 -0600 Subject: [PATCH 05/32] un-break the API back to sycnronous (but now working) --- src/NetMQ/NetMQPoller.cs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 514d9cf8c..a31938c50 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -235,15 +235,20 @@ public void Add([NotNull] Socket socket, [NotNull] Action callback) }); } - public async void Remove(ISocketPollable socket) + public void Remove(ISocketPollable socket) { if (socket == null) throw new ArgumentNullException(nameof(socket)); + + // JASells: not sure I agree with this thow. + // If trying to remove a disposed socket, why complain? It *might* work. The issue + // is if the poller's thread tries to actually service the socket before the remove call... + if (socket.IsDisposed) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - await Run(() => + Run(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) @@ -257,7 +262,9 @@ await Run(() => socket.Socket.EventsChanged -= OnSocketEventsChanged; m_sockets.Remove(socket.Socket); m_isPollSetDirty = true; - }); + }) + .Wait(); + // keep the API syncronous by blocking the calling thread via Wait(), else RemoveThrowsIfSocketAlreadyDisposed() test fails } public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable From 6b344ca8f10c780ea31b0247a190d9deac3a3107 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 7 Dec 2019 21:02:24 -0600 Subject: [PATCH 06/32] rename new test to avoid confusion --- src/NetMQ.Tests/NetMQPollerTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 0de9aa0a6..fc48a3833 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -401,7 +401,7 @@ public void RemoveSocket() } [Fact] - public async void RemoveAndDisposeSocket() + public async void DisposeSocketAfterRemoval() { //set up poller, start it var patient = new NetMQPoller(); From c8798041811400a0096c3d5f26fd438f1936ce74 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 7 Dec 2019 21:21:21 -0600 Subject: [PATCH 07/32] fixed RemoveAndDispose() --- src/NetMQ.Tests/NetMQPollerTest.cs | 82 ++++++++++++++++++++++++++++-- src/NetMQ/NetMQPoller.cs | 25 ++------- 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index fc48a3833..25bd3564d 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -347,6 +347,9 @@ public void RemoveSocket() // identity e.Socket.SkipFrame(); + + //**Note: bad to assert from worker thread! + // If it fails, the test will crash, not report failure! Assert.Equal("Hello", e.Socket.ReceiveFrameString(out bool more)); Assert.False(more); @@ -400,6 +403,77 @@ public void RemoveSocket() } } + [Fact] + public async void RemoveAndDisposeSocket() + { + //set up poller, start it + var patient = new NetMQPoller(); + patient.RunAsync(); + + Assert.True(patient.IsRunning); + + //create a pub-sub pair + var port = 55667; + var conn = $"tcp://127.0.0.1:{port}"; + + var pub = new PublisherSocket(); + pub.Bind(conn); + + var sub = new SubscriberSocket(); + sub.Connect(conn); + sub.SubscribeToAnyTopic(); + + //handle callbacks from poller thread + sub.ReceiveReady += (s, e) => + { + var msg = e.Socket.ReceiveFrameString(); + + Debug.WriteLine($"sub has data: {msg}"); + }; + + //add the subscriber socket to poller + patient.Add(sub); + + //set up pub on separate thread + var canceller = new CancellationTokenSource(); + + var pubAction = new Action(async () => + { + var token = canceller.Token; + + uint i = 0; + + while (!token.IsCancellationRequested) + { + pub.SendFrame($"Hello-{++i}"); + + // send ~ 5Hz + await Task.Delay(200); + } + }); + + var pubThread = Task.Run(pubAction); + + //allow a little time to run + await Task.Delay(2000); + + //now try to remove the sub from poller + patient.RemoveAndDispose(sub); + + Assert.True(sub.IsDisposed); + + //allow for poller to continue running + await Task.Delay(2000); + + patient.Stop(); + Assert.False(patient.IsRunning); + + canceller.Cancel(); + + pub?.Dispose(); + patient?.Dispose(); + } + [Fact] public async void DisposeSocketAfterRemoval() { @@ -449,9 +523,6 @@ public async void DisposeSocketAfterRemoval() } }); - //var pubThread = new Task(pubAction, canceller.Token, TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent); - //pubThread.Start(); - var pubThread = Task.Run(pubAction); //allow a little time to run @@ -542,6 +613,11 @@ public void DisposeThrowsIfSocketAlreadyDisposed() // Dispose the socket. // It is incorrect to have a disposed socket in a poller. // Disposed sockets can throw into the poller's thread. + + //**JASells: And what does that have to do with removing one? Should check for disposed + // socket on Add, not Remove! Check only internally on to avoid accessing a potentially, + // disposed socket, but otherwise, removing it from the list is safe, and preferable to + // throwing exception since it makes the poller more resilient to unintended mis-use! socket.Dispose(); // Dispose throws if a polled socket is disposed diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index a31938c50..fe4f1b820 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -269,28 +269,11 @@ public void Remove(ISocketPollable socket) public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable { - if (socket == null) - throw new ArgumentNullException(nameof(socket)); - if (socket.IsDisposed) - throw new ArgumentException("Must not be disposed.", nameof(socket)); - CheckDisposed(); + //call the remove method + Remove(socket); - Run(() => - { - // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread - if (socket.IsDisposed) - throw new InvalidOperationException( - $"{nameof(NetMQPoller)}.{nameof(RemoveAndDispose)} was called from a non-poller thread, " + - "so ran asynchronously. " + - $"The {socket.GetType().Name} being removed was disposed while the remove " + - $"operation waited to start on the poller thread. When using {nameof(RemoveAndDispose)} " + - "you should not dispose the pollable object ."); - - socket.Socket.EventsChanged -= OnSocketEventsChanged; - m_sockets.Remove(socket.Socket); - m_isPollSetDirty = true; - socket.Dispose(); - }); + //dispose of socket + socket.Dispose(); } public void Remove([NotNull] NetMQTimer timer) From 54c29317a5861273102171d24cdd76655b839b39 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 7 Dec 2019 21:34:59 -0600 Subject: [PATCH 08/32] fix other Remove methods --- src/NetMQ/NetMQPoller.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index fe4f1b820..9ade57768 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -284,7 +284,7 @@ public void Remove([NotNull] NetMQTimer timer) timer.When = -1; - Run(() => m_timers.Remove(timer)); + Run(() => m_timers.Remove(timer)).Wait(); } public void Remove([NotNull] Socket socket) @@ -297,7 +297,8 @@ public void Remove([NotNull] Socket socket) { m_pollinSockets.Remove(socket); m_isPollSetDirty = true; - }); + }) + .Wait(); } #endregion From bc362a95533dc332e59f8ed7f2b403e286785df5 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 7 Dec 2019 21:53:55 -0600 Subject: [PATCH 09/32] clean up the RUn method for .NET40 compat --- src/NetMQ/NetMQPoller.cs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 9ade57768..30192be4c 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -115,24 +115,37 @@ protected override void QueueTask(Task task) public Task Run([NotNull] Action action) { - Task t = new Task(action); + Task t; if (!IsRunning || CanExecuteTaskInline) { action(); - var tcs = new TaskCompletionSource(); - tcs.SetResult(null); - - t = tcs.Task; + t = FromResult(null); } else { + t = new Task(action); t.Start(this); } return t; } + + /// + /// Provides a completed task with the result for a syncronously run action. + /// this only needed for .NET40. Depricated by in 4.5+ + /// + /// + /// + /// + private static Task FromResult(TResult result) + { + var tcs = new TaskCompletionSource(); + tcs.SetResult(result); + return tcs.Task; + } + #else private void Run(Action action) { @@ -140,7 +153,7 @@ private void Run(Action action) } #endif - #endregion +#endregion public NetMQPoller() { From 98a0f00cf3558be83de23e8ae3f91f23f6bb9267 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 7 Dec 2019 22:02:53 -0600 Subject: [PATCH 10/32] undo changes in RemoveThrowsIfSocketAlreadyDisposed --- src/NetMQ.Tests/NetMQPollerTest.cs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 25bd3564d..3174d771a 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -583,19 +583,8 @@ public void RemoveThrowsIfSocketAlreadyDisposed() socket.Dispose(); // Remove throws if the removed socket - //var ex = Assert.Throws(() => poller.Remove(socket)); + var ex = Assert.Throws(() => poller.Remove(socket)); - ArgumentException ex = null; - - try - { - poller.Remove(socket); - } - catch (ArgumentException argEx) - { - ex = argEx; - } - Assert.NotNull(ex); Assert.StartsWith("Must not be disposed.", ex.Message); Assert.Equal("socket", ex.ParamName); From a3819e1c51437784b7c7f2aab221e52058cf0bf5 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 7 Dec 2019 22:14:13 -0600 Subject: [PATCH 11/32] fixing spacing complaints --- src/NetMQ.Tests/NetMQPollerTest.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 3174d771a..4e2d4a965 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -347,7 +347,6 @@ public void RemoveSocket() // identity e.Socket.SkipFrame(); - //**Note: bad to assert from worker thread! // If it fails, the test will crash, not report failure! Assert.Equal("Hello", e.Socket.ReceiveFrameString(out bool more)); @@ -544,7 +543,6 @@ public async void DisposeSocketAfterRemoval() canceller.Cancel(); pub?.Dispose(); - sub?.Dispose();// left here for dev testing when prior .Dispose() call may be commented out patient?.Dispose(); } From 4f3b1e69fe627484d705a41526fb32e2e5555024 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 21 Dec 2019 07:17:51 -0600 Subject: [PATCH 12/32] return task from tests --- src/NetMQ.Tests/NetMQPollerTest.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 4e2d4a965..b9254153a 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -403,7 +403,7 @@ public void RemoveSocket() } [Fact] - public async void RemoveAndDisposeSocket() + public async Task RemoveAndDisposeSocket() { //set up poller, start it var patient = new NetMQPoller(); @@ -474,7 +474,7 @@ public async void RemoveAndDisposeSocket() } [Fact] - public async void DisposeSocketAfterRemoval() + public async Task DisposeSocketAfterRemoval() { //set up poller, start it var patient = new NetMQPoller(); @@ -544,6 +544,7 @@ public async void DisposeSocketAfterRemoval() pub?.Dispose(); patient?.Dispose(); + canceller?.Dispose(); } [Fact] From 33816cb381288e2ec238da73dab25a8e39f6c3f9 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 21 Dec 2019 07:30:31 -0600 Subject: [PATCH 13/32] fixing wait() calls --- src/NetMQ/NetMQPoller.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 30192be4c..094080aee 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -297,7 +297,7 @@ public void Remove([NotNull] NetMQTimer timer) timer.When = -1; - Run(() => m_timers.Remove(timer)).Wait(); + Run(() => m_timers.Remove(timer)).GetAwaiter().GetResult(); } public void Remove([NotNull] Socket socket) @@ -308,10 +308,14 @@ public void Remove([NotNull] Socket socket) Run(() => { - m_pollinSockets.Remove(socket); - m_isPollSetDirty = true; + if (m_pollinSockets.Keys.Contains(socket)) + { + m_pollinSockets.Remove(socket); + m_isPollSetDirty = true; + } }) - .Wait(); + .GetAwaiter() + .GetResult(); } #endregion From bda8b3ff684c245dac7f048184507a246670d715 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 21 Dec 2019 07:45:25 -0600 Subject: [PATCH 14/32] use Task.Factory.StartNew --- src/NetMQ/NetMQPoller.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 094080aee..a6c102baa 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -125,8 +125,7 @@ public Task Run([NotNull] Action action) } else { - t = new Task(action); - t.Start(this); + t = Task.Factory.StartNew(action); } return t; @@ -255,7 +254,7 @@ public void Remove(ISocketPollable socket) // JASells: not sure I agree with this thow. // If trying to remove a disposed socket, why complain? It *might* work. The issue - // is if the poller's thread tries to actually service the socket before the remove call... + // is if the poller's thread tries to actually service the disposed socket before the remove call... if (socket.IsDisposed) throw new ArgumentException("Must not be disposed.", nameof(socket)); @@ -276,8 +275,9 @@ public void Remove(ISocketPollable socket) m_sockets.Remove(socket.Socket); m_isPollSetDirty = true; }) - .Wait(); - // keep the API syncronous by blocking the calling thread via Wait(), else RemoveThrowsIfSocketAlreadyDisposed() test fails + .GetAwaiter() + .GetResult(); + // keep the API syncronous blocking the calling thread here, else RemoveThrowsIfSocketAlreadyDisposed() test fails } public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable From cb16939df2797b0e0ecd1ea2607afe104cbd5f75 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 18 Feb 2020 12:38:31 -0600 Subject: [PATCH 15/32] initial refactor works, except for one test. --- src/NetMQ.Tests/NetMQPollerTest.cs | 6 +- src/NetMQ/ISocketPollableCollection.cs | 2 +- src/NetMQ/ISocketPollableCollectionAsync.cs | 18 +++++ src/NetMQ/NetMQPoller.cs | 83 +++++++++++++++------ 4 files changed, 81 insertions(+), 28 deletions(-) create mode 100644 src/NetMQ/ISocketPollableCollectionAsync.cs diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index b9254153a..e1d5d7843 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -72,12 +72,12 @@ public void Monitoring() int port = rep.BindRandomPort("tcp://127.0.0.1"); reqMonitor.Connected += (s, e) => connectedEvent.Set(); + req.Connect("tcp://127.0.0.1:" + port); reqMonitor.AttachToPoller(poller); poller.RunAsync(); - req.Connect("tcp://127.0.0.1:" + port); req.SendFrame("a"); rep.SkipFrame(); @@ -457,7 +457,7 @@ public async Task RemoveAndDisposeSocket() await Task.Delay(2000); //now try to remove the sub from poller - patient.RemoveAndDispose(sub); + await patient.RemoveAndDisposeAsync(sub); Assert.True(sub.IsDisposed); @@ -528,7 +528,7 @@ public async Task DisposeSocketAfterRemoval() await Task.Delay(2000); //now try to remove the sub from poller - patient.Remove(sub); + await patient.RemoveAsync(sub); // dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught! sub.Dispose(); diff --git a/src/NetMQ/ISocketPollableCollection.cs b/src/NetMQ/ISocketPollableCollection.cs index 18e12689a..94c208b57 100644 --- a/src/NetMQ/ISocketPollableCollection.cs +++ b/src/NetMQ/ISocketPollableCollection.cs @@ -1,5 +1,4 @@ using System; -using System.Threading.Tasks; using JetBrains.Annotations; using NetMQ.Monitoring; @@ -11,6 +10,7 @@ namespace NetMQ /// /// This interface provides an abstraction over the legacy Poller and newer classes for use in . /// + [Obsolete("this should be made internal, to avoid amjor re-work of NetMQMonitor, but prevent accidental mis-use from applications")] public interface ISocketPollableCollection { void Add([NotNull] ISocketPollable socket); diff --git a/src/NetMQ/ISocketPollableCollectionAsync.cs b/src/NetMQ/ISocketPollableCollectionAsync.cs new file mode 100644 index 000000000..004613e75 --- /dev/null +++ b/src/NetMQ/ISocketPollableCollectionAsync.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading.Tasks; +using JetBrains.Annotations; + +namespace NetMQ +{ + /// + /// + /// + /// + /// This interface provides an abstraction over the legacy Poller and newer classes for use in and avoids thread sync issues removing sockets. + /// + public interface ISocketPollableCollectionAsync + { + Task RemoveAsync([NotNull] ISocketPollable socket); + Task RemoveAndDisposeAsync(T socket) where T : ISocketPollable, IDisposable; + } +} diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index a6c102baa..dbf10dbda 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -5,7 +5,6 @@ using System.Linq; using System.Net.Sockets; using System.Threading; -using System.Threading.Tasks; using JetBrains.Annotations; using NetMQ.Core.Utils; #if !NET35 @@ -26,7 +25,7 @@ public sealed class NetMQPoller : #if NET40 ISynchronizeInvoke, #endif - INetMQPoller, ISocketPollableCollection, IEnumerable, IDisposable + INetMQPoller, ISocketPollableCollection, ISocketPollableCollectionAsync, IEnumerable, IDisposable { private readonly List m_sockets = new List(); private readonly List m_timers = new List(); @@ -113,7 +112,7 @@ protected override void QueueTask(Task task) m_tasksQueue.Enqueue(task); } - public Task Run([NotNull] Action action) + internal Task RunAsync([NotNull] Action action) { Task t; @@ -131,6 +130,15 @@ public Task Run([NotNull] Action action) return t; } + [Obsolete("potentially launches a new task to execute the action, but provides no sync mechanism. Please use RemoveAsync() to remove sockets or timers")] + public void Run([NotNull] Action action) + { + if (!IsRunning || CanExecuteTaskInline) + action(); + else + new Task(action).Start(this); + } + /// /// Provides a completed task with the result for a syncronously run action. /// this only needed for .NET40. Depricated by in 4.5+ @@ -200,7 +208,7 @@ public void Add(ISocketPollable socket) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - Run(() => + RunAsync(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) @@ -227,7 +235,7 @@ public void Add([NotNull] NetMQTimer timer) throw new ArgumentNullException(nameof(timer)); CheckDisposed(); - Run(() => m_timers.Add(timer)); + RunAsync(() => m_timers.Add(timer)); } public void Add([NotNull] Socket socket, [NotNull] Action callback) @@ -238,7 +246,7 @@ public void Add([NotNull] Socket socket, [NotNull] Action callback) throw new ArgumentNullException(nameof(callback)); CheckDisposed(); - Run(() => + RunAsync(() => { if (m_pollinSockets.ContainsKey(socket)) return; @@ -247,7 +255,14 @@ public void Add([NotNull] Socket socket, [NotNull] Action callback) }); } + [Obsolete("potentially launches a new task to execute the action, but provides no sync mechanism. Please use RemoveAsync() instead")] public void Remove(ISocketPollable socket) + { + // keeps the current public API intact, though flawed (no way to know when the task actually removes the socket). + RemoveAsync(socket); + } + + public Task RemoveAsync(ISocketPollable socket) { if (socket == null) throw new ArgumentNullException(nameof(socket)); @@ -260,7 +275,7 @@ public void Remove(ISocketPollable socket) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); - Run(() => + return RunAsync(() => { // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread if (socket.IsDisposed) @@ -274,19 +289,40 @@ public void Remove(ISocketPollable socket) socket.Socket.EventsChanged -= OnSocketEventsChanged; m_sockets.Remove(socket.Socket); m_isPollSetDirty = true; - }) - .GetAwaiter() - .GetResult(); - // keep the API syncronous blocking the calling thread here, else RemoveThrowsIfSocketAlreadyDisposed() test fails + }); } + [Obsolete("potentially launches a new task to execute the action, but provides no sync mechanism. Please use RemoveAndDisposeAsync() instead")] public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable { - //call the remove method - Remove(socket); + // this implementation maintains *current* (flawed) behavior, since the task is not awaited + RemoveAndDisposeAsync(socket); + } + + public Task RemoveAndDisposeAsync(T socket) where T : ISocketPollable, IDisposable + { + if (socket == null) + throw new ArgumentNullException(nameof(socket)); + if (socket.IsDisposed) + throw new ArgumentException("Must not be disposed.", nameof(socket)); + CheckDisposed(); + + return RunAsync(() => + { + // Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread + if (socket.IsDisposed) + throw new InvalidOperationException( + $"{nameof(NetMQPoller)}.{nameof(RemoveAndDispose)} was called from a non-poller thread, " + + "so ran asynchronously. " + + $"The {socket.GetType().Name} being removed was disposed while the remove " + + $"operation waited to start on the poller thread. When using {nameof(RemoveAndDispose)} " + + "you should not dispose the pollable object ."); - //dispose of socket - socket.Dispose(); + socket.Socket.EventsChanged -= OnSocketEventsChanged; + m_sockets.Remove(socket.Socket); + m_isPollSetDirty = true; + socket.Dispose(); + }); } public void Remove([NotNull] NetMQTimer timer) @@ -297,7 +333,7 @@ public void Remove([NotNull] NetMQTimer timer) timer.When = -1; - Run(() => m_timers.Remove(timer)).GetAwaiter().GetResult(); + RunAsync(() => m_timers.Remove(timer)); } public void Remove([NotNull] Socket socket) @@ -306,16 +342,14 @@ public void Remove([NotNull] Socket socket) throw new ArgumentNullException(nameof(socket)); CheckDisposed(); - Run(() => + RunAsync(() => { if (m_pollinSockets.Keys.Contains(socket)) { m_pollinSockets.Remove(socket); m_isPollSetDirty = true; } - }) - .GetAwaiter() - .GetResult(); + }); } #endregion @@ -329,7 +363,7 @@ public Task ContainsAsync([NotNull] ISocketPollable socket) CheckDisposed(); var tcs = new TaskCompletionSource(); - Run(() => tcs.SetResult(m_sockets.Contains(socket))); + RunAsync(() => tcs.SetResult(m_sockets.Contains(socket))); return tcs.Task; } @@ -340,7 +374,7 @@ public Task ContainsAsync([NotNull] NetMQTimer timer) CheckDisposed(); var tcs = new TaskCompletionSource(); - Run(() => tcs.SetResult(m_timers.Contains(timer))); + RunAsync(() => tcs.SetResult(m_timers.Contains(timer))); return tcs.Task; } @@ -351,7 +385,7 @@ public Task ContainsAsync([NotNull] Socket socket) CheckDisposed(); var tcs = new TaskCompletionSource(); - Run(() => tcs.SetResult(m_pollinSockets.ContainsKey(socket))); + RunAsync(() => tcs.SetResult(m_pollinSockets.ContainsKey(socket))); return tcs.Task; } #endif @@ -711,7 +745,8 @@ public void Dispose() #if !NET35 m_tasksQueue.Dispose(); #endif - + // JASells: this appears to be running prematurely in test NetMWPollerTests.Monitoring + // causing a objectDisposed exception in NetMQSelector.Select ~line 146 foreach (var socket in m_sockets) { if (socket.IsDisposed) From 544bb9437882149cd2fe0649235b2bc70986c5ee Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 18 Feb 2020 12:44:35 -0600 Subject: [PATCH 16/32] fix Selector to handle disposed sockets --- src/NetMQ/NetMQSelector.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetMQ/NetMQSelector.cs b/src/NetMQ/NetMQSelector.cs index 349838b7c..4db9c9791 100644 --- a/src/NetMQ/NetMQSelector.cs +++ b/src/NetMQ/NetMQSelector.cs @@ -141,7 +141,7 @@ public bool Select([NotNull] Item[] items, int itemsCount, long timeout) selectItem.ResultEvent = PollEvents.None; - if (selectItem.Socket != null) + if (selectItem.Socket != null && !selectItem.Socket.IsDisposed) { var events = (PollEvents)selectItem.Socket.GetSocketOption(ZmqSocketOption.Events); From 87b764930d37d5d85b64bdde9107ed621ceb5d38 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 18 Feb 2020 12:45:48 -0600 Subject: [PATCH 17/32] added comment about previous commmit --- src/NetMQ/NetMQPoller.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index dbf10dbda..ed36ae391 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -747,6 +747,7 @@ public void Dispose() #endif // JASells: this appears to be running prematurely in test NetMWPollerTests.Monitoring // causing a objectDisposed exception in NetMQSelector.Select ~line 146 + // Fixed by adding socket.IsDisposed check in NetMQSelector.Select @line 144 foreach (var socket in m_sockets) { if (socket.IsDisposed) From 92163b13dde676fb44f4551a0b425adbc0d3a893 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 18 Feb 2020 13:25:19 -0600 Subject: [PATCH 18/32] removed comment --- src/NetMQ.Tests/NetMQPollerTest.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index e1d5d7843..4a51b3dde 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -601,11 +601,7 @@ public void DisposeThrowsIfSocketAlreadyDisposed() // Dispose the socket. // It is incorrect to have a disposed socket in a poller. // Disposed sockets can throw into the poller's thread. - - //**JASells: And what does that have to do with removing one? Should check for disposed - // socket on Add, not Remove! Check only internally on to avoid accessing a potentially, - // disposed socket, but otherwise, removing it from the list is safe, and preferable to - // throwing exception since it makes the poller more resilient to unintended mis-use! + socket.Dispose(); // Dispose throws if a polled socket is disposed From 51457d1d29a503cd6750d56e98ad0aaddf603cd6 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 18 Feb 2020 13:38:18 -0600 Subject: [PATCH 19/32] fix region tag formatting --- src/NetMQ/NetMQPoller.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index ed36ae391..5dfaaeeee 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -160,7 +160,7 @@ private void Run(Action action) } #endif -#endregion + #endregion public NetMQPoller() { From 38910d838acd0ec56faa6ac2301e730983e6e080 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 29 Feb 2020 08:12:34 -0600 Subject: [PATCH 20/32] updated comments --- src/NetMQ/NetMQPoller.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index a0efd8486..0bf4ac77a 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -820,10 +820,11 @@ public void Dispose() m_sockets.Remove(((ISocketPollable)m_tasksQueue).Socket); m_tasksQueue.Dispose(); #endif - // JASells: this appears to be running prematurely in test NetMWPollerTests.Monitoring + // JASells: this is running prematurely in test NetMWPollerTests.Monitoring // causing a objectDisposed exception in NetMQSelector.Select ~line 146 // Fixed by adding socket.IsDisposed check in NetMQSelector.Select @line 144. - // Similar check could be done in Poller to avoid servicing disposed sockets... there is already a null check. + // This check will also avoid servicing disposed sockets generally... + // and so we can remove the disposed checks on sockets in Remove, RemoveAsync, RemoveAndDispose, RemvoeAndDisposeASync foreach (var socket in m_sockets) { if (socket.IsDisposed) From 3bacfb8ef19fdd8514c0fd2880a989b229612718 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 29 Feb 2020 08:15:33 -0600 Subject: [PATCH 21/32] cleaer names for async socket remove tests --- src/NetMQ.Tests/NetMQPollerTest.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 9bc20ce1c..3ff5c97f0 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -403,7 +403,7 @@ public void RemoveSocket() } [Fact] - public async Task RemoveAndDisposeSocket() + public async Task RemoveAndDisposeSocketAsync() { //set up poller, start it var patient = new NetMQPoller(); @@ -451,7 +451,7 @@ public async Task RemoveAndDisposeSocket() } }); - var pubThread = Task.Run(pubAction); + Task.Run(pubAction); //allow a little time to run await Task.Delay(2000); @@ -474,7 +474,7 @@ public async Task RemoveAndDisposeSocket() } [Fact] - public async Task DisposeSocketAfterRemoval() + public async Task DisposeSocketAfterAsyncRemoval() { //set up poller, start it var patient = new NetMQPoller(); From 455f6b066c6a775f91e51e4a2c88879bbe2b86c5 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Sat, 29 Feb 2020 08:29:24 -0600 Subject: [PATCH 22/32] I believe this should address the net45/net40 Task.FromResult() concern --- src/NetMQ/NetMQPoller.cs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 0bf4ac77a..2bf3922b4 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -153,20 +153,24 @@ public void Run([NotNull] Action action) /// /// Provides a completed task with the result for a syncronously run action. - /// this only needed for .NET40. Depricated by in 4.5+ + /// this provides a shim for 4.0 and 4.5 compatibility. /// /// /// /// private static Task FromResult(TResult result) { +#if NET40 var tcs = new TaskCompletionSource(); tcs.SetResult(result); return tcs.Task; +#else //NET45+ + return Task.FromResult(result); +#endif } -#else - private void Run(Action action) +#else //NET35 + private void Run(Action action) { action(); } From 52fa20edbc2e681cb83fa08dc69e8e1af3ddf53a Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 31 Mar 2020 11:02:57 -0500 Subject: [PATCH 23/32] ensure tasks are queued on the poller, not the task pool. --- src/NetMQ/NetMQPoller.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 2bf3922b4..60ff8b8e8 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -132,7 +132,8 @@ internal Task RunAsync([NotNull] Action action) } else { - t = Task.Factory.StartNew(action); + t = new Task(action); + t.Start(this); } return t; From 884d4ea82ba25c116eca8132c09d5ca2b2c6097e Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 31 Mar 2020 13:50:03 -0500 Subject: [PATCH 24/32] remove the dictionary checks. --- src/NetMQ/NetMQPoller.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 60ff8b8e8..77db28281 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -404,11 +404,8 @@ public void Remove([NotNull] Socket socket) RunAsync(() => { - if (m_pollinSockets.Keys.Contains(socket)) - { - m_pollinSockets.Remove(socket); - m_isPollSetDirty = true; - } + m_pollinSockets.Remove(socket); + m_isPollSetDirty = true; }); } From 9f17258c917a94422a87a7532aea3f29b3d0b58c Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 31 Mar 2020 14:45:13 -0500 Subject: [PATCH 25/32] fix indentation --- src/NetMQ/NetMQPoller.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 77db28281..1ec381f87 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -177,7 +177,7 @@ private void Run(Action action) } #endif - #endregion + #endregion /// /// Create a new NetMQPoller From 3ea0693765492dd2a98a0f898dcdc3c1d9fefdf6 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Tue, 7 Apr 2020 11:07:58 -0500 Subject: [PATCH 26/32] removed comment that needs seperate PR --- src/NetMQ/NetMQPoller.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 1ec381f87..bf496e6a0 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -310,10 +310,6 @@ public Task RemoveAsync(ISocketPollable socket) if (socket == null) throw new ArgumentNullException(nameof(socket)); - // JASells: not sure I agree with this thow. - // If trying to remove a disposed socket, why complain? It *might* get removed before the poller thread accesses it. - // The issue is if the poller's thread tries to actually service the disposed socket before the remove call... - if (socket.IsDisposed) throw new ArgumentException("Must not be disposed.", nameof(socket)); CheckDisposed(); From 72ebdbc314e8a331f6fb1ced2dd756e1156ef4ee Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Fri, 24 Apr 2020 08:43:08 -0500 Subject: [PATCH 27/32] added RemoveAsync(Socket ) overload --- src/NetMQ.Tests/NetMQPollerTest.cs | 54 ++++++++++++++++++++++++++++++ src/NetMQ/NetMQPoller.cs | 13 ++++++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 3ff5c97f0..760066080 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -1007,6 +1007,60 @@ public void NativeSocket() } } + [Fact] + public async void NativeSocket_RemoveAsync() + { + using (var streamServer = new StreamSocket()) + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) + { + int port = streamServer.BindRandomPort("tcp://*"); + + socket.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), port)); + + var buffer = new byte[] { 1 }; + socket.Send(buffer); + + byte[] identity = streamServer.ReceiveFrameBytes(); + byte[] message = streamServer.ReceiveFrameBytes(); + + Assert.Equal(buffer[0], message[0]); + + var socketSignal = new ManualResetEvent(false); + + using (var poller = new NetMQPoller()) + { + poller.Add(socket, s => + { + socket.Receive(buffer); + + socketSignal.Set(); + }); + + poller.RunAsync(); + + // no message is waiting for the socket so it should fail + Assert.False(socketSignal.WaitOne(100)); + + // sending a message back to the socket + streamServer.SendMoreFrame(identity).SendFrame("a"); + + Assert.True(socketSignal.WaitOne(100)); + + socketSignal.Reset(); + + await poller.RemoveAsync(socket); + + // sending a message back to the socket + streamServer.SendMoreFrame(identity).SendFrame("a"); + + // we remove the native socket so it should fail + Assert.False(socketSignal.WaitOne(100)); + + poller.Stop(); + } + } + } + #endregion #region TaskScheduler tests diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index bf496e6a0..10e3d206f 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -392,13 +392,24 @@ public void Remove([NotNull] NetMQTimer timer) /// /// The socket to remove /// If socket is null + [Obsolete("Queues the action on the poller's thread, but provides no sync mechanism. Please use RemoveAsync() instead")] public void Remove([NotNull] Socket socket) + { + RemoveAsync(socket); + } + + /// + /// Remove the .Net socket from the poller + /// + /// The socket to remove + /// If socket is null + public Task RemoveAsync([NotNull] Socket socket) { if (socket == null) throw new ArgumentNullException(nameof(socket)); CheckDisposed(); - RunAsync(() => + return RunAsync(() => { m_pollinSockets.Remove(socket); m_isPollSetDirty = true; From c14f9f942d63ba0fca7a8492996a488c636ef836 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Fri, 24 Apr 2020 08:48:55 -0500 Subject: [PATCH 28/32] comment update --- src/NetMQ/NetMQPoller.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 10e3d206f..c145e93e2 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -392,7 +392,7 @@ public void Remove([NotNull] NetMQTimer timer) /// /// The socket to remove /// If socket is null - [Obsolete("Queues the action on the poller's thread, but provides no sync mechanism. Please use RemoveAsync() instead")] + [Obsolete("Queues an action on the poller's thread, but provides no sync mechanism. Please use RemoveAsync() instead")] public void Remove([NotNull] Socket socket) { RemoveAsync(socket); From 0a767c6b38a29223b525f368d9bb022680a25d36 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Fri, 24 Apr 2020 08:57:27 -0500 Subject: [PATCH 29/32] added REmoveASync(timer) overload and test --- src/NetMQ.Tests/NetMQPollerTest.cs | 55 ++++++++++++++++++++++++++++++ src/NetMQ/NetMQPoller.cs | 13 ++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/NetMQ.Tests/NetMQPollerTest.cs b/src/NetMQ.Tests/NetMQPollerTest.cs index 760066080..f10d8fdaa 100644 --- a/src/NetMQ.Tests/NetMQPollerTest.cs +++ b/src/NetMQ.Tests/NetMQPollerTest.cs @@ -710,6 +710,61 @@ public void RemoveTimer() } } + [Fact] + public async void RemoveTimer_Async() + { + using (var router = new RouterSocket()) + using (var dealer = new DealerSocket()) + using (var poller = new NetMQPoller { router }) + { + int port = router.BindRandomPort("tcp://127.0.0.1"); + + dealer.Connect("tcp://127.0.0.1:" + port); + + bool timerTriggered = false; + + var timer = new NetMQTimer(TimeSpan.FromMilliseconds(500)); + timer.Elapsed += (s, a) => { timerTriggered = true; }; + + // The timer will fire after 100ms + poller.Add(timer); + + bool messageArrived = false; + + router.ReceiveReady += (s, e) => + { + router.SkipFrame(); + router.SkipFrame(); + messageArrived = true; + //// Remove timer + //poller.Remove(timer); + }; + + poller.RunAsync(); + + Thread.Sleep(20); + + dealer.SendFrame("hello"); + + Thread.Sleep(300); + + try + { + await poller.RemoveAsync(timer); + } + catch (Exception ex) + { + //ensure no exceptions thrown + Assert.Null(ex); + } + + poller.Stop(); + + Assert.True(messageArrived); + Assert.False(timerTriggered); + } + } + [Fact] public void RunMultipleTimes() { diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index c145e93e2..1d292280b 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -376,7 +376,18 @@ public Task RemoveAndDisposeAsync(T socket) where T : ISocketPollable, IDispo /// /// The timer to remove /// If poller is null + [Obsolete("Queues an action on the poller's thread to remove the timer, but provides no sync mechanism. Please use RemoveAndDisposeAsync() instead")] public void Remove([NotNull] NetMQTimer timer) + { + RemoveAsync(timer); + } + + /// + /// Remove a timer from the poller + /// + /// The timer to remove + /// If poller is null + public Task RemoveAsync(NetMQTimer timer) { if (timer == null) throw new ArgumentNullException(nameof(timer)); @@ -384,7 +395,7 @@ public void Remove([NotNull] NetMQTimer timer) timer.When = -1; - RunAsync(() => m_timers.Remove(timer)); + return RunAsync(() => m_timers.Remove(timer)); } /// From fa5e49b6453f3f40dcb7768fa286468d8da9477d Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Fri, 24 Apr 2020 09:06:14 -0500 Subject: [PATCH 30/32] fix depricated message --- src/NetMQ/NetMQPoller.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 1d292280b..2ef716a0e 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -376,7 +376,7 @@ public Task RemoveAndDisposeAsync(T socket) where T : ISocketPollable, IDispo /// /// The timer to remove /// If poller is null - [Obsolete("Queues an action on the poller's thread to remove the timer, but provides no sync mechanism. Please use RemoveAndDisposeAsync() instead")] + [Obsolete("Queues an action on the poller's thread to remove the timer, but provides no sync mechanism. Please use RemoveAsync() instead")] public void Remove([NotNull] NetMQTimer timer) { RemoveAsync(timer); From 83a34b62f9444f4784a340d73e345f4d32e8f1d7 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Fri, 24 Apr 2020 09:22:33 -0500 Subject: [PATCH 31/32] refactored ContainsAsync() methods --- src/NetMQ/NetMQPoller.cs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index 2ef716a0e..efb98f7ac 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -139,6 +139,23 @@ internal Task RunAsync([NotNull] Action action) return t; } + internal Task RunAsync([NotNull] Func action) + { + Task t; + + if (!IsRunning || CanExecuteTaskInline) + { + t = FromResult(action()); + } + else + { + t = new Task(action); + t.Start(this); + } + + return t; + } + /// /// Run an action on the Poller thread /// @@ -444,9 +461,7 @@ public Task ContainsAsync([NotNull] ISocketPollable socket) throw new ArgumentNullException(nameof(socket)); CheckDisposed(); - var tcs = new TaskCompletionSource(); - RunAsync(() => tcs.SetResult(m_sockets.Contains(socket))); - return tcs.Task; + return RunAsync(new Func(() => m_sockets.Contains(socket))); } /// @@ -460,9 +475,7 @@ public Task ContainsAsync([NotNull] NetMQTimer timer) throw new ArgumentNullException(nameof(timer)); CheckDisposed(); - var tcs = new TaskCompletionSource(); - RunAsync(() => tcs.SetResult(m_timers.Contains(timer))); - return tcs.Task; + return RunAsync(new Func(() => m_timers.Contains(timer))); } /// From 217e4f483ee2f4cfc59d4c259e164cb8c42f22c4 Mon Sep 17 00:00:00 2001 From: "joshua.sells" Date: Fri, 24 Apr 2020 11:05:42 -0500 Subject: [PATCH 32/32] added doc block --- src/NetMQ/NetMQPoller.cs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/NetMQ/NetMQPoller.cs b/src/NetMQ/NetMQPoller.cs index efb98f7ac..6dae52b1f 100644 --- a/src/NetMQ/NetMQPoller.cs +++ b/src/NetMQ/NetMQPoller.cs @@ -322,6 +322,13 @@ public void Remove(ISocketPollable socket) RemoveAsync(socket); } + /// + /// Remove a socket from the poller + /// + /// The socket to be removed + /// If socket is null + /// If socket is already disposed + /// If socket is getting disposed during the operation public Task RemoveAsync(ISocketPollable socket) { if (socket == null) @@ -362,6 +369,13 @@ public void RemoveAndDispose(T socket) where T : ISocketPollable, IDisposable RemoveAndDisposeAsync(socket); } + /// + /// Remove the socket from the poller and dispose the socket + /// + /// The socket to be removed + /// If socket is null + /// If socket is disposed + /// If socket got disposed during the operation public Task RemoveAndDisposeAsync(T socket) where T : ISocketPollable, IDisposable { if (socket == null)