Skip to content

Commit

Permalink
Take use of lease to ensure only a single instance of migration tool …
Browse files Browse the repository at this point in the history
…can upload the generated files to the destination folder

Description:

  In a rare case when two or more instances of the migration tool ("assets" or "storage" command) want to upload the generated files
  into the same destination folder, this change will ensure only a single instance of the tool is allowed to upload files, the other instances of
  tool would just skip the process of the asset immediately.

  The change takes use of the lease of blob __migrate inside the output folder to ensure above behavior.

  For any given input asset, once the destination folder is figured out based on the command line options, before it starts to do
  the actual migrating and uploading work, the code will check the lease of that blob, if it can acquire the lease,
  It will update the blob's modification time and keep holding the lease, continue the migrating work,
  the code then releases the lease after the work is done, no matter it is succeed or failed.

  If one previous tool has been holding the lease and didn't release it due to crash or ctrl+c, the new logic can detect it by checking
  the last modification time of the migration lease blob, if the lease was held for too long time, such as 12 hours, the code will break the lease
  and re-acquire the lease with the new lease-Id.

  When the tool has acquired the lease, besides the update of the modification time of the blob, it also tries to check if the output folder contains
  media files that might be generated by previous tool, those media files are all deleted before the new run of the migration tool so that container
  won't contain the mixed media files that are generated by two or more different instances of migration tool.

  The behavior is verified on both Windows and WSL environment for "assets" and "storage" command.
  • Loading branch information
weibz committed Jul 6, 2023
1 parent 48762a7 commit 1352823
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 29 deletions.
8 changes: 5 additions & 3 deletions Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
}
},
"WSL": {
"commandName": "WSL2",
"commandLineArgs": "AMSMigrate.dll assets -v debug -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g provenance -n provenanceuswc --storage-account amsencodermsitest -t $web/linux/${AssetName} -f \"name eq 'uploaded-47efd87d00'\"",
"distributionName": ""
"commandName": "WSL2",
"commandLineArgs": "AMSMigrate.dll storage -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g pohhsuTest -n pohhsumediaservice -o pohhsumediaservice -t generated-output/tx-${AssetName} --prefix tearofsteel-ed541c0dbb-StandardEncoder-H264MultipleBitrate1080p-5e8f356300 --skip-migrated false ",

"commandLineArgs_Assets": "AMSMigrate.dll assets -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g pohhsuTest -n pohhsumediaservice -o pohhsumediaservice -t generated-output/tx-${AssetName} -f \"name eq 'tearofsteel-ed541c0dbb-StandardEncoder-H264MultipleBitrate1080p-5e8f356300'\" --skip-migrated false ",
"distributionName": ""
}
}
}
50 changes: 39 additions & 11 deletions ams/AssetMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,30 +185,58 @@ public async Task<MigrationResult> MigrateAsync(

if (result.IsSupportedAsset)
{
foreach (var transform in _transformFactory.AssetTransforms)
var uploader = _transformFactory.Uploader;
var (Container, Path) = _transformFactory.TemplateMapper.ExpandAssetTemplate(
record.Asset,
_options.PathTemplate);

var canUpload = await uploader.CanUploadAsync(
Container,
Path,
cancellationToken);

if (canUpload)
{
var transformResult = (AssetMigrationResult)await transform.RunAsync(record, cancellationToken);
try
{
foreach (var transform in _transformFactory.AssetTransforms)
{
var transformResult = (AssetMigrationResult)await transform.RunAsync(record, cancellationToken);

result.Status = transformResult.Status;
result.OutputPath = transformResult.OutputPath;
result.Status = transformResult.Status;
result.OutputPath = transformResult.OutputPath;

if (result.Status == MigrationStatus.Failed)
if (result.Status == MigrationStatus.Failed)
{
break;
}
}
}
finally
{
break;
await uploader.UploadCleanupAsync(Container, Path, cancellationToken);
}
}
else
{
//
// Another instance of the tool is working on the output container,
//
result.Status = MigrationStatus.Skipped;

_logger.LogWarning("Another tool is working on the container {container} and output path: {output}",
Container,
Path);
}
}
else
{
// The asset type is not supported in this milestone,
// Mark the status as Skipped for caller to do the statistics.
result.Status = MigrationStatus.Skipped;
}
}

if (result.Status == MigrationStatus.Skipped)
{
_logger.LogWarning("Skipping asset {name} because it is not in a supported format!!!", asset.Data.Name);
_logger.LogWarning("Skipping asset {name} because it is not in a supported format!!!", asset.Data.Name);
}
}

if (_options.MarkCompleted)
Expand Down
46 changes: 39 additions & 7 deletions ams/StorageMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,26 +155,58 @@ private async Task<MigrationResult> MigrateAsync(

if (result.IsSupportedAsset)
{
var transforms = _transformFactory.StorageTransforms;
var uploader = _transformFactory.Uploader;
var (Container, Path) = _transformFactory.TemplateMapper.ExpandPathTemplate(
assetDetails.Container,
_storageOptions.PathTemplate);

foreach (var transform in transforms)
var canUpload = await uploader.CanUploadAsync(
Container,
Path,
cancellationToken);
if (canUpload)
{
var transformResult = await transform.RunAsync(assetDetails, cancellationToken);
try
{
var transforms = _transformFactory.StorageTransforms;

foreach (var transform in transforms)
{
var transformResult = await transform.RunAsync(assetDetails, cancellationToken);

result.Status = transformResult.Status;
result.OutputPath = transformResult.OutputPath;
result.Status = transformResult.Status;
result.OutputPath = transformResult.OutputPath;

if (result.Status == MigrationStatus.Failed)
if (result.Status == MigrationStatus.Failed)
{
break;
}
}
}
finally
{
break;
await uploader.UploadCleanupAsync(Container, Path, cancellationToken);
}
}
else
{
//
// Another instance of the tool is working on the output container,
//
result.Status = MigrationStatus.Skipped;

_logger.LogWarning("Another tool is working on the container {container} and output path: {output}",
Container,
Path);
}
}
else
{
// The asset type is not supported in this milestone,
// Mark the status as Skipped for caller to do the statistics.
result.Status = MigrationStatus.Skipped;

_logger.LogWarning("Skipping container {name} because it is not in a supported format!!!", container.Name);
}

if (_storageOptions.MarkCompleted)
Expand Down
176 changes: 176 additions & 0 deletions azure/AzureStorageUploader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Extensions.Logging;
using System.ComponentModel;
using System.Reflection.Metadata;
using System.Text;
using System.Threading;
using static System.Reflection.Metadata.BlobBuilder;

namespace AMSMigrate.Azure
{
Expand All @@ -15,6 +20,10 @@ internal class AzureStorageUploader : IFileUploader
private readonly ILogger _logger;
private readonly BlobServiceClient _blobServiceClient;

private const string MigrateLock = "__migrate";
private TimeSpan MaximumMigrateTimePerAsset = TimeSpan.FromHours(12);
private string _leaseId;

public AzureStorageUploader(
MigratorOptions options,
TokenCredential credential,
Expand All @@ -27,6 +36,8 @@ public AzureStorageUploader(
storageUri = new Uri($"https://{options.StoragePath}.blob.core.windows.net");
}
_blobServiceClient = new BlobServiceClient(storageUri, credential);

_leaseId = Guid.NewGuid().ToString("D");
}

public Uri GetDestinationUri(string container, string fileName)
Expand Down Expand Up @@ -89,5 +100,170 @@ public async Task UpdateOutputStatus(

await container.SetMetadataAsync(metadata, cancellationToken: cancellationToken);
}

/// <summary>
/// Check if migrator can upload asset files into the specific output folder in storage container.
/// This is to ensure only one migrator can upload the generated files to the destination place at any given time.
/// </summary>
/// <param name="containerName">The name of the output container.</param>
/// <param name="outputPath">The destination folder for the migrated asset. </param>
/// <param name="cancellationToken"></param>
/// <returns>
/// true means this migrator can upload the files to the destination folder,
/// false means the destination folder is used by another migrator tool at the moment.
/// </returns>
public async Task<bool> CanUploadAsync(
string containerName,
string outputPath,
CancellationToken cancellationToken)
{
var canUpload = false;
var retryForAcquire = false;

var lockBlob = await GetLockBlobAsync(containerName, outputPath, cancellationToken);
var leaseClient = lockBlob.GetBlobLeaseClient(_leaseId);

do
{
try
{
retryForAcquire = false;

await leaseClient.AcquireAsync(new TimeSpan(-1), cancellationToken: cancellationToken);

// Acquired the lease successfully, update the LastModified time with a new empty
// list of metadata with the current LeaseId.
await lockBlob.SetMetadataAsync(
new Dictionary<string, string>(),
new BlobRequestConditions() { LeaseId = _leaseId },
cancellationToken);

// Remove old media files that might be uploaded by other migrator tool
// so that new migrator tool will upload a whole set of media files.

var container = _blobServiceClient.GetBlobContainerClient(containerName);

var blobItems = await container.GetBlobsAsync(prefix: outputPath,
cancellationToken: cancellationToken
).ToListAsync();

if (blobItems.Count > 1)
{
foreach (var bi in blobItems)
{
if (!bi.Name.EndsWith(MigrateLock))
{
await container.DeleteBlobAsync(bi.Name, cancellationToken: cancellationToken);
}
}
}

canUpload = true;
}
catch (RequestFailedException ex)
{
if (ex.ErrorCode == "LeaseAlreadyPresent")
{
// The lease is held by another instance of the tool.
// Double check the last modification time of the lock blob,
// If the last modification time was long time ago, it implies the another instance of tool might
// crash or shutdown unexpectedly, it is time to break the lease and re-acquire it.

var properties = await lockBlob.GetPropertiesAsync(cancellationToken: cancellationToken);

var lastModifiedTime = properties.Value.LastModified;
var elapsed = DateTimeOffset.UtcNow - lastModifiedTime;

if (elapsed >= MaximumMigrateTimePerAsset)
{
await leaseClient.BreakAsync(cancellationToken: cancellationToken);

_logger.LogTrace(
"The lease for output path {path} under {container} was broken, the last Modified time was {modified}, it has elapsed for {elapsed} seconds.",
outputPath,
containerName,
lastModifiedTime,
elapsed.TotalSeconds);

retryForAcquire = true;
}
else
{
_logger.LogTrace(
"The lease for output path {path} under {container} is still held by another tool, the last Modified time was {modified}, it has elapsed for {elapsed} seconds.",
outputPath,
containerName,
lastModifiedTime,
elapsed.TotalSeconds);
}
}
else
{
// Failed to acquire lease with unexpected error code.
_logger.LogWarning(
"Failed to acquire the lease for output path {path} under {container}, Error Code: {error}, Message: {message}.",
outputPath,
containerName,
ex.ErrorCode,
ex.Message);
}
}

} while (retryForAcquire);

return canUpload;
}

/// <summary>
/// The instance of migrator has finished the work for uploading files into the destination folder,
/// clean up the status so that the destination folder can be used by another migrator tool.
/// </summary>
/// <param name="containerName">The name of the output container.</param>
/// <param name="outputPath">The destination folder for the migrated asset. </param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task UploadCleanupAsync(
string containerName,
string outputPath,
CancellationToken cancellationToken)
{
var lockBlob = await GetLockBlobAsync(containerName, outputPath, cancellationToken);
var leaseClient = lockBlob.GetBlobLeaseClient(_leaseId);

await leaseClient.ReleaseAsync(cancellationToken: cancellationToken);

_logger.LogTrace("The lease for output path {path} under {container} is released.",
outputPath,
containerName);
}

private async Task<BlockBlobClient> GetLockBlobAsync(
string containerName,
string outputPath,
CancellationToken cancellationToken)
{
var container = _blobServiceClient.GetBlobContainerClient(containerName);
await container.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
var blobName = outputPath;

if (!outputPath.EndsWith("/"))
{
blobName += "/";
}

blobName += MigrateLock;

var lockBlob = container.GetBlockBlobClient(blobName);

var exists = await lockBlob.ExistsAsync(cancellationToken);

if (!exists.Value)
{
var content = Encoding.UTF8.GetBytes("Lock");
await lockBlob.UploadAsync(new MemoryStream(content), cancellationToken: cancellationToken);
}

return lockBlob;
}
}
}
10 changes: 10 additions & 0 deletions contracts/IFileUploader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,15 @@ Task UploadAsync(
Task UpdateOutputStatus(
string containerName,
CancellationToken cancellationToken);

Task<bool> CanUploadAsync(
string containerName,
string outputPath,
CancellationToken cancellationToken);

Task UploadCleanupAsync(
string containerName,
string outputPath,
CancellationToken cancellationToken);
}
}
16 changes: 16 additions & 0 deletions local/LocalFileUploader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ public async Task UpdateOutputStatus(
// Make it no op for local file provider.
return;
}

public async Task<bool> CanUploadAsync(
string containerName,
string outputPath,
CancellationToken cancellationToken)
{
return true;
}

public async Task UploadCleanupAsync(
string containerName,
string outputPath,
CancellationToken cancellationToken)
{
// Nothing to cleanup, make it no-op for local file provider.
}
#pragma warning restore CS1998
}
}
Loading

0 comments on commit 1352823

Please sign in to comment.