Skip to content

Commit

Permalink
Merge pull request #963 from somdoron/sockets
Browse files Browse the repository at this point in the history
add a way to send and receive routing keys
  • Loading branch information
somdoron authored Jan 28, 2021
2 parents 528e3aa + 1812271 commit 10a263b
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 17 deletions.
21 changes: 21 additions & 0 deletions src/NetMQ.Tests/RouterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,26 @@ public void Handover()
}
}
}

[Fact]
public void RoutingKeys()
{
using var router = new RouterSocket("inproc://routing-keys");
using var dealer = new DealerSocket("inproc://routing-keys");

dealer.SendRoutingKeys(new RoutingKey(1)).SendFrame("Hello");

var keys = router.ReceiveRoutingKeys();
var message = router.ReceiveFrameString();

Assert.Equal("Hello", message);

router.SendRoutingKeys(keys).SendFrame("World");

dealer.ReceiveRoutingKeys();
var reply = dealer.ReceiveFrameString();

Assert.Equal("World", reply);
}
}
}
15 changes: 0 additions & 15 deletions src/NetMQ/NetMQ-unix.csproj

This file was deleted.

83 changes: 83 additions & 0 deletions src/NetMQ/OutgoingSocketExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -685,5 +685,88 @@ public static bool TrySendFrame(this IOutgoingSocket socket, TimeSpan timeout, R
}

#endregion

#region Sending Routing Keys

/// <summary>
/// Send empty list of routing keys over <paramref name="socket"/>, append an empty message at the end of the keys.
/// </summary>
/// <param name="socket">the IOutgoingSocket to transmit on</param>
public static IOutgoingSocket SendEmptyRoutingKeys(this IOutgoingSocket socket)
{
return socket.SendMoreFrameEmpty();
}

/// <summary>
/// Send a single routing key over <paramref name="socket"/>, append an empty message afterwards.
/// </summary>
/// <param name="socket">the IOutgoingSocket to transmit on</param>
public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, params RoutingKey[] routingKeys)
{
foreach(var routingKey in routingKeys)
socket.SendMoreFrame(routingKey);

socket.SendMoreFrameEmpty();

return socket;
}

/// <summary>
/// Send routing keys over <paramref name="socket"/>, append an empty message at the end of the keys.
/// </summary>
/// <param name="socket">the IOutgoingSocket to transmit on</param>
/// <param name="routingKeys">the routing keys to send</param>
public static IOutgoingSocket SendRoutingKeys(this IOutgoingSocket socket, IEnumerable<RoutingKey> routingKeys)
{
foreach(var routingKey in routingKeys)
socket.SendMoreFrame(routingKey);

socket.SendMoreFrameEmpty();

return socket;
}

/// <summary>
/// Attempt to transmit routing keys over <paramref name="socket"/>.
/// If message cannot be sent immediately, return <c>false</c>.
/// Routing is always sent as more frame.
/// </summary>
/// <param name="socket">the IOutgoingSocket to transmit on</param>
/// <param name="routingKeys">the routing keys to send</param>
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
public static bool TrySendRoutingKeys(this IOutgoingSocket socket, IEnumerable<RoutingKey> routingKeys)
{
return socket.TrySendRoutingKeys(TimeSpan.Zero, routingKeys);
}

/// <summary>
/// Attempt to transmit routing key over <paramref name="socket"/>.
/// If message cannot be sent within <paramref name="timeout"/>, return <c>false</c>.
/// Routing is always sent as more frame.
/// </summary>
/// <param name="socket">the IOutgoingSocket to transmit on</param>
/// <param name="timeout">The maximum period of time to try to send a message.</param>
/// <param name="routingKeys">the routing keys to send</param>
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
public static bool TrySendRoutingKeys(this IOutgoingSocket socket, TimeSpan timeout, IEnumerable<RoutingKey> routingKeys)
{
var enumerator = routingKeys.GetEnumerator();

// Empty collection, just trying to send the empty message
if (!enumerator.MoveNext())
return socket.TrySendFrameEmpty(timeout, true);

if (!socket.TrySendFrame(enumerator.Current))
return false;

while (enumerator.MoveNext())
socket.SendMoreFrame(enumerator.Current);

socket.SendMoreFrameEmpty();

return true;
}

#endregion
}
}
87 changes: 85 additions & 2 deletions src/NetMQ/ReceivingSocketExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public static string ReceiveFrameString(this IReceivingSocket socket, Encoding e
}
finally
{
msg.Close();
msg.Close();
}
}

Expand Down Expand Up @@ -449,7 +449,7 @@ public static bool TryReceiveFrameString(this IReceivingSocket socket, TimeSpan
}
finally
{
msg.Close();
msg.Close();
}
}

Expand Down Expand Up @@ -1111,5 +1111,88 @@ public static bool TryReceiveRoutingKey(this IReceivingSocket socket, TimeSpan t
}

#endregion

#region Receiving a routing keys

/// <summary>
/// Receive routing keys from <paramref name="socket"/> until a bottom message arrives (empty message), blocking until one arrives.
/// </summary>
/// <param name="socket">The socket to receive from.</param>
/// <returns>The routing keys.</returns>
public static IEnumerable<RoutingKey> ReceiveRoutingKeys(this IReceivingSocket socket)
{
List<RoutingKey> keys = new List<RoutingKey>();

while (true)
{
var routingKey = socket.ReceiveRoutingKey(out bool more);
if (!more)
throw new InvalidException("Malformed multipart message, empty message expected");

if (routingKey.Bytes.Length == 0)
break;

keys.Add(routingKey);
}

return keys;
}

/// <summary>
/// Attempt to receive routing-keys from <paramref name="socket"/>, an empty message expected at the end of routing keys.
/// If no message is immediately available, return <c>false</c>.
/// </summary>
/// <param name="socket">The socket to receive from.</param>
/// <param name="routingKeys">The routing-keys of the received message.</param>
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, [NotNullWhen(returnValue: true)] out IEnumerable<RoutingKey>? routingKeys)
{
return TryReceiveRoutingKeys(socket, TimeSpan.Zero, out routingKeys);
}

/// <summary>
/// Attempt to receive a routing-keys from <paramref name="socket"/>.
/// If no message is available within <paramref name="timeout"/>, return <c>false</c>.
/// </summary>
/// <param name="socket">The socket to receive from.</param>
/// <param name="timeout">The maximum period of time to wait for a message to become available.</param>
/// <param name="routingKeys">The routing-keys of the received message.</param>
/// <returns><c>true</c> if a message was available, otherwise <c>false</c>.</returns>
public static bool TryReceiveRoutingKeys(this IReceivingSocket socket, TimeSpan timeout, [NotNullWhen(returnValue: true)] out IEnumerable<RoutingKey>? routingKeys)
{
RoutingKey first = new RoutingKey();

if (socket.TryReceiveRoutingKey(timeout, ref first, out bool more))
{
if (!more)
throw new InvalidException("Malformed multipart message, empty message expected");

List<RoutingKey> keys = new List<RoutingKey>();
routingKeys = keys;

if (first.Bytes.Length == 0)
return true;

keys.Add(first);
while (true)
{
var routingKey = socket.ReceiveRoutingKey(out more);
if (!more)
throw new InvalidException("Malformed multipart message, empty message expected");

if (routingKey.Bytes.Length == 0)
break;

keys.Add(routingKey);
}

return true;
}

routingKeys = null;
return false;
}

#endregion
}
}
9 changes: 9 additions & 0 deletions src/NetMQ/RoutingKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public RoutingKey(string b64)
bytes = Convert.FromBase64String(b64);
}

/// <summary>
/// Create a new routing key out of a Int64
/// </summary>
/// <param name="value"></param>
public RoutingKey(long value)
{
bytes = NetworkOrderBitsConverter.GetBytes(value);
}

internal byte[] Bytes
{
get { return bytes; }
Expand Down

0 comments on commit 10a263b

Please sign in to comment.