diff --git a/Properties/launchSettings.json b/Properties/launchSettings.json index 5645e57..a77a892 100644 --- a/Properties/launchSettings.json +++ b/Properties/launchSettings.json @@ -14,9 +14,11 @@ } }, "WSL": { - "commandName": "WSL2", - "commandLineArgs": "AMSMigrate.dll assets -v debug -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g provenance -n provenanceuswc --storage-account amsencodermsitest -t $web/linux/${AssetName} -f \"name eq 'uploaded-47efd87d00'\"", - "distributionName": "" + "commandName": "WSL2", + "commandLineArgs": "AMSMigrate.dll storage -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g pohhsuTest -n pohhsumediaservice -o pohhsumediaservice -t generated-output/tx-${AssetName} --prefix tearofsteel-ed541c0dbb-StandardEncoder-H264MultipleBitrate1080p-5e8f356300 --skip-migrated false ", + + "commandLineArgs_Assets": "AMSMigrate.dll assets -s b2c12710-ecdf-4532-9ba6-7e74a219b5f2 -g pohhsuTest -n pohhsumediaservice -o pohhsumediaservice -t generated-output/tx-${AssetName} -f \"name eq 'tearofsteel-ed541c0dbb-StandardEncoder-H264MultipleBitrate1080p-5e8f356300'\" --skip-migrated false ", + "distributionName": "" } } } \ No newline at end of file diff --git a/ams/AssetMigrator.cs b/ams/AssetMigrator.cs index 401ad20..474f781 100644 --- a/ams/AssetMigrator.cs +++ b/ams/AssetMigrator.cs @@ -185,30 +185,58 @@ public async Task MigrateAsync( if (result.IsSupportedAsset) { - foreach (var transform in _transformFactory.AssetTransforms) + var uploader = _transformFactory.Uploader; + var (Container, Path) = _transformFactory.TemplateMapper.ExpandAssetTemplate( + record.Asset, + _options.PathTemplate); + + var canUpload = await uploader.CanUploadAsync( + Container, + Path, + cancellationToken); + + if (canUpload) { - var transformResult = (AssetMigrationResult)await transform.RunAsync(record, cancellationToken); + try + { + foreach (var transform in _transformFactory.AssetTransforms) + { + var transformResult = (AssetMigrationResult)await transform.RunAsync(record, cancellationToken); - result.Status = transformResult.Status; - result.OutputPath = transformResult.OutputPath; + result.Status = transformResult.Status; + result.OutputPath = transformResult.OutputPath; - if (result.Status == MigrationStatus.Failed) + if (result.Status == MigrationStatus.Failed) + { + break; + } + } + } + finally { - break; + await uploader.UploadCleanupAsync(Container, Path, cancellationToken); } } + else + { + // + // Another instance of the tool is working on the output container, + // + result.Status = MigrationStatus.Skipped; + + _logger.LogWarning("Another tool is working on the container {container} and output path: {output}", + Container, + Path); + } } else { // The asset type is not supported in this milestone, // Mark the status as Skipped for caller to do the statistics. result.Status = MigrationStatus.Skipped; - } - } - if (result.Status == MigrationStatus.Skipped) - { - _logger.LogWarning("Skipping asset {name} because it is not in a supported format!!!", asset.Data.Name); + _logger.LogWarning("Skipping asset {name} because it is not in a supported format!!!", asset.Data.Name); + } } if (_options.MarkCompleted) diff --git a/ams/StorageMigrator.cs b/ams/StorageMigrator.cs index 030632b..94b0ac1 100644 --- a/ams/StorageMigrator.cs +++ b/ams/StorageMigrator.cs @@ -155,26 +155,58 @@ private async Task MigrateAsync( if (result.IsSupportedAsset) { - var transforms = _transformFactory.StorageTransforms; + var uploader = _transformFactory.Uploader; + var (Container, Path) = _transformFactory.TemplateMapper.ExpandPathTemplate( + assetDetails.Container, + _storageOptions.PathTemplate); - foreach (var transform in transforms) + var canUpload = await uploader.CanUploadAsync( + Container, + Path, + cancellationToken); + if (canUpload) { - var transformResult = await transform.RunAsync(assetDetails, cancellationToken); + try + { + var transforms = _transformFactory.StorageTransforms; + + foreach (var transform in transforms) + { + var transformResult = await transform.RunAsync(assetDetails, cancellationToken); - result.Status = transformResult.Status; - result.OutputPath = transformResult.OutputPath; + result.Status = transformResult.Status; + result.OutputPath = transformResult.OutputPath; - if (result.Status == MigrationStatus.Failed) + if (result.Status == MigrationStatus.Failed) + { + break; + } + } + } + finally { - break; + await uploader.UploadCleanupAsync(Container, Path, cancellationToken); } } + else + { + // + // Another instance of the tool is working on the output container, + // + result.Status = MigrationStatus.Skipped; + + _logger.LogWarning("Another tool is working on the container {container} and output path: {output}", + Container, + Path); + } } else { // The asset type is not supported in this milestone, // Mark the status as Skipped for caller to do the statistics. result.Status = MigrationStatus.Skipped; + + _logger.LogWarning("Skipping container {name} because it is not in a supported format!!!", container.Name); } if (_storageOptions.MarkCompleted) diff --git a/azure/AzureStorageUploader.cs b/azure/AzureStorageUploader.cs index 9badb72..e544726 100644 --- a/azure/AzureStorageUploader.cs +++ b/azure/AzureStorageUploader.cs @@ -6,6 +6,11 @@ using Azure.Storage.Blobs.Models; using Azure.Storage.Blobs.Specialized; using Microsoft.Extensions.Logging; +using System.ComponentModel; +using System.Reflection.Metadata; +using System.Text; +using System.Threading; +using static System.Reflection.Metadata.BlobBuilder; namespace AMSMigrate.Azure { @@ -15,6 +20,10 @@ internal class AzureStorageUploader : IFileUploader private readonly ILogger _logger; private readonly BlobServiceClient _blobServiceClient; + private const string MigrateLock = "__migrate"; + private TimeSpan MaximumMigrateTimePerAsset = TimeSpan.FromHours(12); + private string _leaseId; + public AzureStorageUploader( MigratorOptions options, TokenCredential credential, @@ -27,6 +36,8 @@ public AzureStorageUploader( storageUri = new Uri($"https://{options.StoragePath}.blob.core.windows.net"); } _blobServiceClient = new BlobServiceClient(storageUri, credential); + + _leaseId = Guid.NewGuid().ToString("D"); } public Uri GetDestinationUri(string container, string fileName) @@ -89,5 +100,170 @@ public async Task UpdateOutputStatus( await container.SetMetadataAsync(metadata, cancellationToken: cancellationToken); } + + /// + /// Check if migrator can upload asset files into the specific output folder in storage container. + /// This is to ensure only one migrator can upload the generated files to the destination place at any given time. + /// + /// The name of the output container. + /// The destination folder for the migrated asset. + /// + /// + /// true means this migrator can upload the files to the destination folder, + /// false means the destination folder is used by another migrator tool at the moment. + /// + public async Task CanUploadAsync( + string containerName, + string outputPath, + CancellationToken cancellationToken) + { + var canUpload = false; + var retryForAcquire = false; + + var lockBlob = await GetLockBlobAsync(containerName, outputPath, cancellationToken); + var leaseClient = lockBlob.GetBlobLeaseClient(_leaseId); + + do + { + try + { + retryForAcquire = false; + + await leaseClient.AcquireAsync(new TimeSpan(-1), cancellationToken: cancellationToken); + + // Acquired the lease successfully, update the LastModified time with a new empty + // list of metadata with the current LeaseId. + await lockBlob.SetMetadataAsync( + new Dictionary(), + new BlobRequestConditions() { LeaseId = _leaseId }, + cancellationToken); + + // Remove old media files that might be uploaded by other migrator tool + // so that new migrator tool will upload a whole set of media files. + + var container = _blobServiceClient.GetBlobContainerClient(containerName); + + var blobItems = await container.GetBlobsAsync(prefix: outputPath, + cancellationToken: cancellationToken + ).ToListAsync(); + + if (blobItems.Count > 1) + { + foreach (var bi in blobItems) + { + if (!bi.Name.EndsWith(MigrateLock)) + { + await container.DeleteBlobAsync(bi.Name, cancellationToken: cancellationToken); + } + } + } + + canUpload = true; + } + catch (RequestFailedException ex) + { + if (ex.ErrorCode == "LeaseAlreadyPresent") + { + // The lease is held by another instance of the tool. + // Double check the last modification time of the lock blob, + // If the last modification time was long time ago, it implies the another instance of tool might + // crash or shutdown unexpectedly, it is time to break the lease and re-acquire it. + + var properties = await lockBlob.GetPropertiesAsync(cancellationToken: cancellationToken); + + var lastModifiedTime = properties.Value.LastModified; + var elapsed = DateTimeOffset.UtcNow - lastModifiedTime; + + if (elapsed >= MaximumMigrateTimePerAsset) + { + await leaseClient.BreakAsync(cancellationToken: cancellationToken); + + _logger.LogTrace( + "The lease for output path {path} under {container} was broken, the last Modified time was {modified}, it has elapsed for {elapsed} seconds.", + outputPath, + containerName, + lastModifiedTime, + elapsed.TotalSeconds); + + retryForAcquire = true; + } + else + { + _logger.LogTrace( + "The lease for output path {path} under {container} is still held by another tool, the last Modified time was {modified}, it has elapsed for {elapsed} seconds.", + outputPath, + containerName, + lastModifiedTime, + elapsed.TotalSeconds); + } + } + else + { + // Failed to acquire lease with unexpected error code. + _logger.LogWarning( + "Failed to acquire the lease for output path {path} under {container}, Error Code: {error}, Message: {message}.", + outputPath, + containerName, + ex.ErrorCode, + ex.Message); + } + } + + } while (retryForAcquire); + + return canUpload; + } + + /// + /// The instance of migrator has finished the work for uploading files into the destination folder, + /// clean up the status so that the destination folder can be used by another migrator tool. + /// + /// The name of the output container. + /// The destination folder for the migrated asset. + /// + /// + public async Task UploadCleanupAsync( + string containerName, + string outputPath, + CancellationToken cancellationToken) + { + var lockBlob = await GetLockBlobAsync(containerName, outputPath, cancellationToken); + var leaseClient = lockBlob.GetBlobLeaseClient(_leaseId); + + await leaseClient.ReleaseAsync(cancellationToken: cancellationToken); + + _logger.LogTrace("The lease for output path {path} under {container} is released.", + outputPath, + containerName); + } + + private async Task GetLockBlobAsync( + string containerName, + string outputPath, + CancellationToken cancellationToken) + { + var container = _blobServiceClient.GetBlobContainerClient(containerName); + await container.CreateIfNotExistsAsync(cancellationToken: cancellationToken); + var blobName = outputPath; + + if (!outputPath.EndsWith("/")) + { + blobName += "/"; + } + + blobName += MigrateLock; + + var lockBlob = container.GetBlockBlobClient(blobName); + + var exists = await lockBlob.ExistsAsync(cancellationToken); + + if (!exists.Value) + { + var content = Encoding.UTF8.GetBytes("Lock"); + await lockBlob.UploadAsync(new MemoryStream(content), cancellationToken: cancellationToken); + } + + return lockBlob; + } } } diff --git a/contracts/IFileUploader.cs b/contracts/IFileUploader.cs index 27aa91d..2199490 100644 --- a/contracts/IFileUploader.cs +++ b/contracts/IFileUploader.cs @@ -15,5 +15,15 @@ Task UploadAsync( Task UpdateOutputStatus( string containerName, CancellationToken cancellationToken); + + Task CanUploadAsync( + string containerName, + string outputPath, + CancellationToken cancellationToken); + + Task UploadCleanupAsync( + string containerName, + string outputPath, + CancellationToken cancellationToken); } } diff --git a/local/LocalFileUploader.cs b/local/LocalFileUploader.cs index 4aca5bc..562bbed 100644 --- a/local/LocalFileUploader.cs +++ b/local/LocalFileUploader.cs @@ -48,6 +48,22 @@ public async Task UpdateOutputStatus( // Make it no op for local file provider. return; } + + public async Task CanUploadAsync( + string containerName, + string outputPath, + CancellationToken cancellationToken) + { + return true; + } + + public async Task UploadCleanupAsync( + string containerName, + string outputPath, + CancellationToken cancellationToken) + { + // Nothing to cleanup, make it no-op for local file provider. + } #pragma warning restore CS1998 } } diff --git a/transform/PackageTransform.cs b/transform/PackageTransform.cs index 5460209..769d576 100644 --- a/transform/PackageTransform.cs +++ b/transform/PackageTransform.cs @@ -5,13 +5,13 @@ namespace AMSMigrate.Transform { - internal class PackageTransform : StorageTransform + internal class PackageTransform : StorageTransform { private readonly PackagerFactory _packagerFactory; public PackageTransform( MigratorOptions options, - ILogger> logger, + ILogger logger, TemplateMapper templateMapper, IFileUploader uploader, PackagerFactory factory) diff --git a/transform/TransformFactory.cs b/transform/TransformFactory.cs index da2fb60..e498318 100644 --- a/transform/TransformFactory.cs +++ b/transform/TransformFactory.cs @@ -12,6 +12,9 @@ internal class TransformFactory private readonly List _assetTransforms = new List(); + private readonly IFileUploader _uploader; + private readonly TemplateMapper _templateMapper; + public TransformFactory( ILoggerFactory loggerFactory, TOption options, @@ -19,26 +22,28 @@ public TransformFactory( ICloudProvider cloudProvider) { _storageTransforms = new List(); - + if (options is MigratorOptions migratorOption) { - var uploader = cloudProvider.GetStorageProvider(migratorOption); + _uploader = cloudProvider.GetStorageProvider(migratorOption); + _templateMapper = templateMapper; + var packagerFactory = new PackagerFactory(loggerFactory, migratorOption); if (migratorOption.Packager != Packager.None) { _storageTransforms.Add( - new PackageTransform( + new PackageTransform( migratorOption, - loggerFactory.CreateLogger>(), + loggerFactory.CreateLogger(), templateMapper, - uploader, + _uploader, packagerFactory)); } if (migratorOption.CopyNonStreamable || migratorOption.Packager == Packager.None) { _storageTransforms.Add(new UploadTransform( - migratorOption, uploader, loggerFactory.CreateLogger(), templateMapper)); + migratorOption, _uploader, loggerFactory.CreateLogger(), templateMapper)); } } else @@ -61,6 +66,10 @@ public TransformFactory( public IEnumerable AssetTransforms => _assetTransforms; + public IFileUploader Uploader => _uploader; + + public TemplateMapper TemplateMapper => _templateMapper; + public ITransform TransformTransform => throw new NotImplementedException(); } }