diff --git a/Shift.Cache.Redis/Properties/AssemblyInfo.cs b/Shift.Cache.Redis/Properties/AssemblyInfo.cs index 422446a..18cc5f6 100644 --- a/Shift.Cache.Redis/Properties/AssemblyInfo.cs +++ b/Shift.Cache.Redis/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.6.3")] +[assembly: AssemblyVersion("1.0.7.0")] diff --git a/Shift.Cache.Redis/Shift.Cache.Redis.csproj b/Shift.Cache.Redis/Shift.Cache.Redis.csproj index 1f83ba7..5617a51 100644 --- a/Shift.Cache.Redis/Shift.Cache.Redis.csproj +++ b/Shift.Cache.Redis/Shift.Cache.Redis.csproj @@ -30,8 +30,8 @@ 4 - - ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + + ..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll True diff --git a/Shift.Cache.Redis/packages.config b/Shift.Cache.Redis/packages.config index f330a43..1c151c6 100644 --- a/Shift.Cache.Redis/packages.config +++ b/Shift.Cache.Redis/packages.config @@ -1,5 +1,5 @@  - + \ No newline at end of file diff --git a/Shift.DataLayer/JobDALRedis.cs b/Shift.DataLayer/JobDALRedis.cs index 605150d..342b55b 100644 --- a/Shift.DataLayer/JobDALRedis.cs +++ b/Shift.DataLayer/JobDALRedis.cs @@ -292,7 +292,7 @@ private async Task SetCommandStopAsync(ICollection jobIDs, bool isS var count = 0; var command = JobCommand.Stop.ToString().ToLower(); var jobStopIndex = JobCommandIndexTemplate.Replace("[command]", command); - var jobStopProcess = JobCommandProcessTemplate.Replace("[command]", command); + var jobStopProcessCommand = JobCommandProcessTemplate.Replace("[command]", command); foreach (var jobID in jobIDs) { var jobKey = JobKeyPrefix + jobID; @@ -307,7 +307,7 @@ private async Task SetCommandStopAsync(ICollection jobIDs, bool isS } else { - jobStopProcess = jobStopProcess.Replace("[processid]", job.ProcessID); + var jobStopProcess = jobStopProcessCommand.Replace("[processid]", job.ProcessID); trn.SetAddAsync(jobStopProcess, jobKey); //set job-[command]:[processID] job:123 "" } trn.HashSetAsync(jobKey, JobFields.Command, JobCommand.Stop); diff --git a/Shift.DataLayer/Properties/AssemblyInfo.cs b/Shift.DataLayer/Properties/AssemblyInfo.cs index 470a5d2..02218c7 100644 --- a/Shift.DataLayer/Properties/AssemblyInfo.cs +++ b/Shift.DataLayer/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.6.3")] +[assembly: AssemblyVersion("1.0.7.0")] diff --git a/Shift.DataLayer/Shift.DataLayer.csproj b/Shift.DataLayer/Shift.DataLayer.csproj index a07a8a7..56ec00d 100644 --- a/Shift.DataLayer/Shift.DataLayer.csproj +++ b/Shift.DataLayer/Shift.DataLayer.csproj @@ -47,8 +47,8 @@ ..\packages\MongoDB.Driver.Core.2.4.3\lib\net45\MongoDB.Driver.Core.dll True - - ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + + ..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll True @@ -60,8 +60,8 @@ - - ..\packages\System.Runtime.InteropServices.RuntimeInformation.4.0.0\lib\net45\System.Runtime.InteropServices.RuntimeInformation.dll + + ..\packages\System.Runtime.InteropServices.RuntimeInformation.4.3.0\lib\net45\System.Runtime.InteropServices.RuntimeInformation.dll True diff --git a/Shift.DataLayer/packages.config b/Shift.DataLayer/packages.config index 9c10b6a..421f074 100644 --- a/Shift.DataLayer/packages.config +++ b/Shift.DataLayer/packages.config @@ -4,7 +4,7 @@ - + - + \ No newline at end of file diff --git a/Shift.Entities/Properties/AssemblyInfo.cs b/Shift.Entities/Properties/AssemblyInfo.cs index 7569992..cca516c 100644 --- a/Shift.Entities/Properties/AssemblyInfo.cs +++ b/Shift.Entities/Properties/AssemblyInfo.cs @@ -35,4 +35,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.6.3")] +[assembly: AssemblyVersion("1.0.7.0")] diff --git a/Shift.Entities/Shift.Entities.csproj b/Shift.Entities/Shift.Entities.csproj index 64718d4..b9d043d 100644 --- a/Shift.Entities/Shift.Entities.csproj +++ b/Shift.Entities/Shift.Entities.csproj @@ -31,8 +31,8 @@ 4 - - ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + + ..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll True diff --git a/Shift.Entities/packages.config b/Shift.Entities/packages.config index e8313a7..cd7d42d 100644 --- a/Shift.Entities/packages.config +++ b/Shift.Entities/packages.config @@ -1,4 +1,4 @@  - + \ No newline at end of file diff --git a/Shift.UnitTest/Properties/AssemblyInfo.cs b/Shift.UnitTest/Properties/AssemblyInfo.cs index e2ade5d..3e0ef41 100644 --- a/Shift.UnitTest/Properties/AssemblyInfo.cs +++ b/Shift.UnitTest/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.6.3")] +[assembly: AssemblyVersion("1.0.7.0")] diff --git a/Shift/JobServer.cs b/Shift/JobServer.cs index 26c70ec..6d0754f 100644 --- a/Shift/JobServer.cs +++ b/Shift/JobServer.cs @@ -7,7 +7,6 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -using System.Threading; using System.Reflection; using Newtonsoft.Json; @@ -21,14 +20,13 @@ namespace Shift { public class JobServer { - private IJobDAL jobDAL = null; private ServerConfig config = null; private readonly ContainerBuilder builder; private readonly IContainer container; private static System.Timers.Timer timer = null; private static System.Timers.Timer timer2 = null; - private Dictionary taskList = null; //reference to Tasks + private List workerList = null; //reference to Workers /// ///Initializes a new instance of JobServer class, injects data layer with connection and configuration strings. @@ -85,19 +83,23 @@ public JobServer(ServerConfig config) /// private void Initialize() { - this.taskList = new Dictionary(); - - jobDAL = container.Resolve(); + this.workerList = new List(); //OPTIONAL: Load all EXTERNAL DLLs needed by this process AppDomain.CurrentDomain.AssemblyResolve += AssemblyHelpers.OnAssemblyResolve; AssemblyHelpers.LoadAssemblies(config.AssemblyFolder, config.AssemblyListPath); + + //Create Worker + for(var i=1; i <= config.Workers; i++) + { + var worker = new Worker(config, container, i); + workerList.Add(worker); + } } #endregion #region Server Run and Manage jobs - //The region that primarily manage and run/stop/cleanup jobs that were added in the DB table by the clients /// /// Run jobs server in a scheduled timer interval. @@ -120,7 +122,7 @@ private async Task RunServerAsync(bool isSync) if (isSync) CleanUp(); else - await CleanUpAsync(); + await CleanUpAsync(isSync); if (timer == null && timer2 == null) { @@ -147,7 +149,7 @@ private async Task RunServerAsync(bool isSync) if (isSync) CleanUp(); else - await CleanUpAsync(); + await CleanUpAsync(isSync); }; } @@ -166,49 +168,6 @@ private async Task RunServerAsync(bool isSync) timer2.Start(); } - /// - /// Stop running jobs server. - /// - public void StopServer() - { - //Stop timers - if (timer != null && timer2 != null) - { - timer.Close(); - timer2.Close(); - } - - //Stop all running Jobs - var runningJobsList = jobDAL.GetJobsByProcessAndStatus(config.ProcessID, JobStatus.Running); - if(runningJobsList.Count() > 0) - { - jobDAL.SetCommandStop(runningJobsList.Select(x => x.JobID).ToList()); - } - - //Stop jobs marked with 'stop' command - StopJobs(); - } - - public async Task StopServerAsync() - { - //Stop timers - if (timer != null && timer2 != null) - { - timer.Close(); - timer2.Close(); - } - - //Stop all running Jobs - var runningJobsList = await jobDAL.GetJobsByProcessAndStatusAsync(config.ProcessID, JobStatus.Running); - if (runningJobsList.Count() > 0) - { - await jobDAL.SetCommandStopAsync(runningJobsList.Select(x => x.JobID).ToList()); - } - - //Stop jobs marked with 'stop' command - await StopJobsAsync(); - } - /// /// Pick up jobs from storage and run them. /// @@ -217,24 +176,20 @@ public void RunJobs() RunJobsAsync(true).GetAwaiter().GetResult(); } - public Task RunJobsAsync() + public async Task RunJobsAsync() { - return RunJobsAsync(false); + await RunJobsAsync(false); } - private async Task RunJobsAsync(bool isSync) + public async Task RunJobsAsync(bool isSync) { - //Check max jobs count - var runningCount = isSync ? jobDAL.CountRunningJobs(config.ProcessID) : await jobDAL.CountRunningJobsAsync(config.ProcessID); - if (runningCount >= config.MaxRunnableJobs) + foreach (var worker in workerList) { - return; + if (isSync) + worker.RunJobsAsync(isSync).GetAwaiter().GetResult(); + else + await worker.RunJobsAsync(isSync); } - - var rowsToGet = config.MaxRunnableJobs - runningCount; - var claimedJobs = isSync ? jobDAL.ClaimJobsToRun(config.ProcessID, rowsToGet) : await jobDAL.ClaimJobsToRunAsync(config.ProcessID, rowsToGet); - - RunClaimedJobsAsync(claimedJobs, isSync); } /// @@ -254,304 +209,139 @@ public Task RunJobsAsync(IEnumerable jobIDs) private async Task RunJobsAsync(IEnumerable jobIDs, bool isSync) { - //Try to start the selected jobs, ignoring MaxRunableJobs - var jobList = isSync ? jobDAL.GetNonRunningJobsByIDs(jobIDs) : await jobDAL.GetNonRunningJobsByIDsAsync(jobIDs); - var claimedJobs = isSync ? jobDAL.ClaimJobsToRun(config.ProcessID, jobList.ToList()) : await jobDAL.ClaimJobsToRunAsync(config.ProcessID, jobList.ToList()); - - RunClaimedJobsAsync(claimedJobs, isSync); - } - - //Finally Run the Jobs - private async Task RunClaimedJobsAsync(IEnumerable jobList, bool isSync) - { - if (jobList.Count() == 0) - return; - - foreach (var job in jobList) + foreach (var worker in workerList) { - try - { - var decryptedParameters = Entities.Helpers.Decrypt(job.Parameters, config.EncryptionKey); - - CreateTask(job.ProcessID, job.JobID, job.InvokeMeta, decryptedParameters, isSync); //Use the DecryptedParameters, NOT encrypted Parameters - } - catch (Exception exc) - { - var error = job.Error + " " + exc.ToString(); - var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? config.ProcessID : job.ProcessID; - var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() - : await SetErrorAsync(processID, job.JobID, error, isSync); - } + if (isSync) + worker.RunJobsAsync(jobIDs, isSync).GetAwaiter().GetResult(); + else + await worker.RunJobsAsync(jobIDs, isSync); } } - private static Type GetTypeFromAllAssemblies(string typeName) + public void StopJobs() { - //try this domain first - var type = Type.GetType(typeName); - - if (type != null) - return type; - - //Get all assemblies - List assemblies = AppDomain.CurrentDomain.GetAssemblies().ToList(); - - foreach (var assembly in assemblies) - { - Type t = assembly.GetType(typeName, false); - if (t != null) - return t; - } - throw new ArgumentException("Type " + typeName + " doesn't exist in the current app domain"); + StopJobsAsync(true).GetAwaiter().GetResult(); } - //Create the thread that will run the job - private void CreateTask(string processID, string jobID, string invokeMeta, string parameters, bool isSync) + public async Task StopJobsAsync() { - var invokeMetaObj = JsonConvert.DeserializeObject(invokeMeta, SerializerSettings.Settings); + await StopJobsAsync(false); + } - var type = GetTypeFromAllAssemblies(invokeMetaObj.Type); - var parameterTypes = JsonConvert.DeserializeObject(invokeMetaObj.ParameterTypes, SerializerSettings.Settings); - var methodInfo = Helpers.GetNonOpenMatchingMethod(type, invokeMetaObj.Method, parameterTypes); - if (methodInfo == null) - { - throw new InvalidOperationException(string.Format("The type '{0}' has no method with signature '{1}({2})'", type.FullName, invokeMetaObj.Method, string.Join(", ", parameterTypes.Select(x => x.Name)))); - } - object instance = null; - if (!methodInfo.IsStatic) //not static? + private async Task StopJobsAsync(bool isSync) + { + foreach (var worker in workerList) { - instance = Helpers.CreateInstance(type); //create object method instance + if (isSync) + worker.StopJobsAsync(isSync).GetAwaiter().GetResult(); + else + await worker.StopJobsAsync(isSync); } + } - Task jobTask = null; - if (taskList.ContainsKey(jobID)) + /// + /// Stop running jobs server. + /// + public void StopServer() + { + //Stop timers + if (timer != null && timer2 != null) { - jobTask = taskList[jobID].JobTask; - if (jobTask != null && !jobTask.IsCompleted) //already running and NOT completed? - return; + timer.Close(); + timer2.Close(); } - //Don't use ConfigureWait(false), since some tasks don't have Cancellation token and must use the original context to return after completion - var tokenSource = new CancellationTokenSource(); - var token = tokenSource.Token; - - var taskInfo = new TaskInfo(); - taskInfo.TokenSource = tokenSource; - //Keep track of running thread - //If using Task.Run(), MUST register task in TaskList right away if NOT the CleanUp() method may mark running Task as Error, because it's running but not in list!!! - taskList[jobID] = taskInfo; - if (isSync) + if (config.ForceStopServer) { - jobTask = Task.Run(() => ExecuteJobAsync(processID, jobID, methodInfo, parameters, instance, token, isSync).GetAwaiter().GetResult(), token) - .ContinueWith(t => - { - DeleteCachedProgressDelayedAsync(jobID); - }, TaskContinuationOptions.RunContinuationsAsynchronously); + AttemptToStopJobsAsync(true).GetAwaiter().GetResult(); } else { - jobTask = Task.Run(async () => await ExecuteJobAsync(processID, jobID, methodInfo, parameters, instance, token, isSync), token) - .ContinueWith(t => - { - DeleteCachedProgressDelayedAsync(jobID); - }, TaskContinuationOptions.RunContinuationsAsynchronously); + WaitForAllRunningJobsToStop(true).GetAwaiter().GetResult(); } - taskInfo.JobTask = jobTask; - taskList[jobID] = taskInfo; //re-update with filled Task } - private async Task> UpdateProgressEventAsync(string jobID, bool isSync) + public async Task StopServerAsync() { - //Insert a progress row first for the related jobID if it doesn't exist - if (isSync) + //Stop timers + if (timer != null && timer2 != null) { - jobDAL.SetProgress(jobID, null, null, null); + timer.Close(); + timer2.Close(); } - else + + if (config.ForceStopServer) { - await jobDAL.SetProgressAsync(jobID, null, null, null); + await AttemptToStopJobsAsync(false); } - await jobDAL.SetCachedProgressAsync(jobID, null, null, null).ConfigureAwait(false); - - var start = DateTime.Now; - var updateTs = config.ProgressDBInterval ?? new TimeSpan(0, 0, 10); //default to 10 sec interval for updating DB - - //SynchronousProgress is event based and called regularly by the running job - SynchronousProgress progress = new SynchronousProgress(progressInfo => + else { - jobDAL.SetCachedProgressAsync(jobID, progressInfo.Percent, progressInfo.Note, progressInfo.Data).ConfigureAwait(false); //Update Cache - - var diffTs = DateTime.Now - start; - if (diffTs >= updateTs || progressInfo.Percent >= 100) - { - jobDAL.UpdateProgressAsync(jobID, progressInfo.Percent, progressInfo.Note, progressInfo.Data).ConfigureAwait(false); //Update DB async, don't wait/don't hold - start = DateTime.Now; - } - }); - - return progress; + await WaitForAllRunningJobsToStop(false); + } } - private async Task ExecuteJobAsync(string processID, string jobID, MethodInfo methodInfo, string parameters, object instance, CancellationToken? token, bool isSync) + private async Task SetStopAllRunningJobsAsync(bool isSync) { - try + foreach(var worker in workerList) { - //Set job to Running if (isSync) - jobDAL.SetToRunning(processID, jobID); + worker.SetStopAllRunningJobsAsync(isSync).GetAwaiter().GetResult(); else - await jobDAL.SetToRunningAsync(processID, jobID); - jobDAL.SetCachedProgressStatusAsync(jobID, JobStatus.Running); - - var progress = isSync ? UpdateProgressEventAsync(jobID, true).GetAwaiter().GetResult() : await UpdateProgressEventAsync(jobID, false); //Need this to update the progress of the job's - - //Invoke Method - if(token == null) - { - var tokenSource = new CancellationTokenSource(); //not doing anything when using thread.Start() - token = tokenSource.Token; - } - var args = DALHelpers.DeserializeArguments(token.Value, progress, methodInfo, parameters); - methodInfo.Invoke(instance, args); + await worker.SetStopAllRunningJobsAsync(isSync); } - catch (TargetInvocationException exc) + } + + private async Task AttemptToStopJobsAsync(bool isSync) + { + if (isSync) { - if (exc.InnerException is OperationCanceledException) - { - if (isSync) - { - SetToStoppedAsync(new List { jobID }, isSync).GetAwaiter().GetResult(); - } - else - { - await SetToStoppedAsync(new List { jobID }, isSync); - } - - throw exc.InnerException; - } - else - { - var job = isSync ? jobDAL.GetJob(jobID) : await jobDAL.GetJobAsync(jobID); - var error = job.Error + " " + exc.ToString(); - var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() - : await SetErrorAsync(processID, job.JobID, error, isSync); - - throw exc; - } + SetStopAllRunningJobsAsync(true).GetAwaiter().GetResult(); + StopJobsAsync(true).GetAwaiter().GetResult(); + Task.Delay(config.StopServerDelay).GetAwaiter().GetResult(); //Wait before shutting down } - catch (Exception exc) + else { - var job = isSync ? jobDAL.GetJob(jobID) : await jobDAL.GetJobAsync(jobID); - var error = job.Error + " " + exc.ToString(); - var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() - : await SetErrorAsync(processID, job.JobID, error, isSync); - - throw exc; + await SetStopAllRunningJobsAsync(false); //mark 'stop' commands + await StopJobsAsync(false); //stop jobs + await Task.Delay(config.StopServerDelay); //Wait before shutting down } - - //Completed successfully with no error - if (isSync) - jobDAL.SetCompleted(processID, jobID); - else - await jobDAL.SetCompletedAsync(processID, jobID); - jobDAL.SetCachedProgressStatusAsync(jobID, JobStatus.Completed); } - private Task DeleteCachedProgressDelayedAsync(string jobID) + //Clean up periodically and wait for all running tasks to Stop before quitting + private async Task WaitForAllRunningJobsToStop(bool isSync) { - return Task.Delay(60000).ContinueWith(_ => + var stopNow = false; + while (!stopNow) { - jobDAL.DeleteCachedProgressAsync(jobID); - }, TaskContinuationOptions.RunContinuationsAsynchronously); - } - - private async Task SetErrorAsync(string processID, string jobID, string error, bool isSync) - { - jobDAL.SetCachedProgressErrorAsync(jobID, error); - if (isSync) - return jobDAL.SetError(processID, jobID, error); - else - return await jobDAL.SetErrorAsync(processID, jobID, error); - } - - /// - /// Stops jobs. - /// Only jobs marked with "STOP" command will be acted on. - /// ThreadMode="task" will use CancellationTokenSource.Cancel() - /// Make sure the jobs implement CancellationToken.IsCancellationRequested check for throwing and clean up canceled job. - /// ThreadMode="thread" will use Thread.Abort. - /// No clean up is possible when thread is aborted. - /// - public void StopJobs() - { - var jobIDs = jobDAL.GetJobIdsByProcessAndCommand(config.ProcessID, JobCommand.Stop); - StopJobsAsync(jobIDs, true).GetAwaiter().GetResult(); - } + var runningCount = 0; - public async Task StopJobsAsync() - { - var jobIDs = await jobDAL.GetJobIdsByProcessAndCommandAsync(config.ProcessID, JobCommand.Stop); - await StopJobsAsync(jobIDs, false); - } + if (isSync) + AttemptToStopJobsAsync(isSync).GetAwaiter().GetResult(); + else + await AttemptToStopJobsAsync(isSync); - private async Task StopJobsAsync(IReadOnlyCollection jobIDs, bool isSync) - { - var nonWaitJobIDs = new List(); - if (taskList.Count > 0) - { - foreach (var jobID in jobIDs) + //check for still running jobs + foreach (var worker in workerList) { - var taskInfo = taskList.ContainsKey(jobID) ? taskList[jobID] : null; - if (taskInfo != null) + if (isSync) { - if (!taskInfo.TokenSource.Token.IsCancellationRequested) - { - taskInfo.TokenSource.Cancel(); //attempt to cancel task - Task.Run(async () => await taskInfo.JobTask) - .ContinueWith(result => { taskList.Remove(jobID); }); //Don't hold the process, just run another task to wait for cancellable task - } + worker.CleanUpAsync(isSync).GetAwaiter().GetResult(); + runningCount += worker.CountRunningJobsAsync(isSync).GetAwaiter().GetResult(); } else { - nonWaitJobIDs.Add(jobID); + await worker.CleanUpAsync(isSync); + runningCount += await worker.CountRunningJobsAsync(isSync); } } - //Set to stopped for nonWaitJobIDs - if (isSync) - { - SetToStoppedAsync(nonWaitJobIDs, true).GetAwaiter().GetResult(); - } - else - { - await SetToStoppedAsync(nonWaitJobIDs, false); - } - } - else - { - if (isSync) - { - SetToStoppedAsync(jobIDs, true).GetAwaiter().GetResult(); - } - else + if (runningCount == 0) { - await SetToStoppedAsync(jobIDs, false); + stopNow = true; + break; } } - } - private async Task SetToStoppedAsync(IReadOnlyCollection jobIDs, bool isSync) - { - if(isSync) - { - jobDAL.SetToStopped(jobIDs.ToList()); - } - else - { - await jobDAL.SetToStoppedAsync(jobIDs.ToList()); - } - jobDAL.SetCachedProgressStatusAsync(jobIDs, JobStatus.Stopped); //redis cached progress - jobDAL.DeleteCachedProgressAsync(jobIDs); } /// @@ -562,92 +352,29 @@ private async Task SetToStoppedAsync(IReadOnlyCollection jobIDs, bool is /// public void CleanUp() { - StopJobs(); - - //Delete past completed jobs from storage - if (config.AutoDeletePeriod != null) - { - jobDAL.Delete(config.AutoDeletePeriod.Value, config.AutoDeleteStatus); - } - CleanUpAsync(true).GetAwaiter().GetResult(); } public async Task CleanUpAsync() { - await StopJobsAsync(); - - //Delete past completed jobs from storage - if (config.AutoDeletePeriod != null) - { - await jobDAL.DeleteAsync(config.AutoDeletePeriod.Value, config.AutoDeleteStatus); - } - await CleanUpAsync(false); } - private async Task CleanUpAsync(bool isSync) + public async Task CleanUpAsync(bool isSync) { - var jobList = isSync ? jobDAL.GetJobsByProcessAndStatus(config.ProcessID, JobStatus.Running) : await jobDAL.GetJobsByProcessAndStatusAsync(config.ProcessID, JobStatus.Running); - foreach (var job in jobList) + foreach (var worker in workerList) { - if (!taskList.ContainsKey(job.JobID)) - { - //Doesn't exist anymore? - var error = "Error: No actual running job process found. Try reset and run again."; - var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? config.ProcessID : job.ProcessID; - var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() - : await SetErrorAsync(processID, job.JobID, error, isSync); - } + if (isSync) + worker.CleanUpAsync(isSync).GetAwaiter().GetResult(); + else + await worker.CleanUpAsync(isSync); } + } - if (taskList.Count > 0) - { - var inDBjobIDs = new List(); - jobList = isSync ? jobDAL.GetJobs(taskList.Keys.ToList()) : await jobDAL.GetJobsAsync(taskList.Keys.ToList()); //get all jobs in taskList - - // If jobs doesn't even exists in storage (zombie?), remove from taskList. - inDBjobIDs = jobList.Select(j => j.JobID).ToList(); - var taskListKeys = new List(taskList.Keys); //copy keys before removal - foreach (var jobID in taskListKeys) - { - if (!inDBjobIDs.Contains(jobID)) - { - TaskInfo taskInfo = null; - if (taskList.Keys.Contains(jobID)) - taskInfo = taskList[jobID]; - else - continue; - - taskInfo.TokenSource.Cancel(); //attempt to cancel - taskList.Remove(jobID); - } - } - - // For job status that is stopped, error, completed => Remove from thread list, no need to keep track of them anymore. - var statuses = new List - { - (int)JobStatus.Stopped, - (int)JobStatus.Error, - (int)JobStatus.Completed - }; + #endregion - foreach (var job in jobList) - { - if (job.Status != null - && statuses.Contains((int)job.Status) - && taskList.ContainsKey(job.JobID)) - { - var taskInfo = taskList[job.JobID]; - taskInfo.TokenSource.Dispose(); - taskList.Remove(job.JobID); - } - } - } - } - #endregion } } diff --git a/Shift/Properties/AssemblyInfo.cs b/Shift/Properties/AssemblyInfo.cs index 933783d..d0d8215 100644 --- a/Shift/Properties/AssemblyInfo.cs +++ b/Shift/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.6.3")] +[assembly: AssemblyVersion("1.0.7.0")] diff --git a/Shift/ServerConfig.cs b/Shift/ServerConfig.cs index d87ce29..7fcd89f 100644 --- a/Shift/ServerConfig.cs +++ b/Shift/ServerConfig.cs @@ -18,7 +18,7 @@ public class ServerConfig [Required] public string StorageMode { get; set; } = Shift.StorageMode.Redis; //mssql, redis, etc... - //Maximum jobs to run for each server + //Maximum jobs to run for each worker public int MaxRunnableJobs { get; set; } = 100; //Points to a text file with list of DLLs to load, one DLL per line. @@ -40,9 +40,13 @@ public class ServerConfig public int ServerTimerInterval { get; set; } = 5000; //interval timer for server running jobs public int ServerTimerInterval2 { get; set; } = 10000; //interval timer2 for server running cleanup - public int? AutoDeletePeriod { get; set; } = 168; //Default = 7 days; hours before jobs are deleted + public int? AutoDeletePeriod { get; set; } //Default = null; hours before jobs are deleted public IList AutoDeleteStatus { get; set; } = new List { JobStatus.Completed }; //Jobs with status to delete public bool PollingOnce { get; set; } = false; //Poll server only once, useful for debugging + + public int Workers { get; set; } = 1; //How many workers per server. + public bool ForceStopServer { get; set; } = false;//If true, no waiting for all running jobs to stop before shutting down server + public int StopServerDelay { get; set; } = 30000; //Delay shut down } } diff --git a/Shift/Shift.csproj b/Shift/Shift.csproj index 455ae3a..449c664 100644 --- a/Shift/Shift.csproj +++ b/Shift/Shift.csproj @@ -34,16 +34,16 @@ bin\Release\Shift.XML - - ..\packages\Autofac.4.4.0\lib\net45\Autofac.dll + + ..\packages\Autofac.4.5.0\lib\net45\Autofac.dll True ..\packages\Dapper.1.50.2\lib\net451\Dapper.dll True - - ..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll + + ..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll True @@ -69,6 +69,7 @@ + diff --git a/Shift/Worker.cs b/Shift/Worker.cs new file mode 100644 index 0000000..7911e4a --- /dev/null +++ b/Shift/Worker.cs @@ -0,0 +1,472 @@ +using Autofac; +using Shift.Entities; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Reflection; + +using Newtonsoft.Json; +using Shift.DataLayer; + +namespace Shift +{ + public class Worker + { + private ServerConfig config = null; + private IJobDAL jobDAL = null; + private readonly IContainer container; + private Dictionary taskList = null; //reference to Tasks + + private int workerID; + private string workerProcessID; + + public Worker(ServerConfig config, IContainer container, int workerID) + { + taskList = new Dictionary(); + jobDAL = container.Resolve(); + this.workerID = workerID; + this.config = config; + + this.workerProcessID = config.ProcessID + "-" + workerID; + } + + public async Task CountRunningJobsAsync(bool isSync) + { + var runningCount = isSync ? jobDAL.CountRunningJobs(workerProcessID) : await jobDAL.CountRunningJobsAsync(workerProcessID); + return runningCount; + } + + public async Task RunJobsAsync(bool isSync) + { + //Check max jobs count + var runningCount = isSync ? CountRunningJobsAsync(isSync).GetAwaiter().GetResult() : await CountRunningJobsAsync(isSync); + if (runningCount >= config.MaxRunnableJobs) + { + return; + } + + var rowsToGet = config.MaxRunnableJobs - runningCount; + var claimedJobs = isSync ? jobDAL.ClaimJobsToRun(workerProcessID, rowsToGet) : await jobDAL.ClaimJobsToRunAsync(workerProcessID, rowsToGet); + + RunClaimedJobsAsync(claimedJobs, isSync); + } + + public async Task RunJobsAsync(IEnumerable jobIDs, bool isSync) + { + //Try to start the selected jobs, ignoring MaxRunableJobs + var jobList = isSync ? jobDAL.GetNonRunningJobsByIDs(jobIDs) : await jobDAL.GetNonRunningJobsByIDsAsync(jobIDs); + var claimedJobs = isSync ? jobDAL.ClaimJobsToRun(workerProcessID, jobList.ToList()) : await jobDAL.ClaimJobsToRunAsync(workerProcessID, jobList.ToList()); + + RunClaimedJobsAsync(claimedJobs, isSync); + } + + //Finally Run the Jobs + private async Task RunClaimedJobsAsync(IEnumerable jobList, bool isSync) + { + if (jobList.Count() == 0) + return; + + foreach (var job in jobList) + { + try + { + var decryptedParameters = Entities.Helpers.Decrypt(job.Parameters, config.EncryptionKey); + + CreateTask(job.ProcessID, job.JobID, job.InvokeMeta, decryptedParameters, isSync); //Use the DecryptedParameters, NOT encrypted Parameters + } + catch (Exception exc) + { + var error = job.Error + " " + exc.ToString(); + var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? workerProcessID : job.ProcessID; + var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() + : await SetErrorAsync(processID, job.JobID, error, isSync); + } + } + } + + private static Type GetTypeFromAllAssemblies(string typeName) + { + //try this domain first + var type = Type.GetType(typeName); + + if (type != null) + return type; + + //Get all assemblies + List assemblies = AppDomain.CurrentDomain.GetAssemblies().ToList(); + + foreach (var assembly in assemblies) + { + Type t = assembly.GetType(typeName, false); + if (t != null) + return t; + } + throw new ArgumentException("Type " + typeName + " doesn't exist in the current app domain"); + } + + //Create the thread that will run the job + private void CreateTask(string processID, string jobID, string invokeMeta, string parameters, bool isSync) + { + var invokeMetaObj = JsonConvert.DeserializeObject(invokeMeta, SerializerSettings.Settings); + + var type = GetTypeFromAllAssemblies(invokeMetaObj.Type); + var parameterTypes = JsonConvert.DeserializeObject(invokeMetaObj.ParameterTypes, SerializerSettings.Settings); + var methodInfo = Helpers.GetNonOpenMatchingMethod(type, invokeMetaObj.Method, parameterTypes); + if (methodInfo == null) + { + throw new InvalidOperationException(string.Format("The type '{0}' has no method with signature '{1}({2})'", type.FullName, invokeMetaObj.Method, string.Join(", ", parameterTypes.Select(x => x.Name)))); + } + object instance = null; + if (!methodInfo.IsStatic) //not static? + { + instance = Helpers.CreateInstance(type); //create object method instance + } + + Task jobTask = null; + if (taskList.ContainsKey(jobID)) + { + jobTask = taskList[jobID].JobTask; + if (jobTask != null && !jobTask.IsCompleted) //already running and NOT completed? + return; + } + + //Don't use ConfigureWait(false), since some tasks don't have Cancellation token and must use the original context to return after completion + var tokenSource = new CancellationTokenSource(); + var token = tokenSource.Token; + + var taskInfo = new TaskInfo(); + taskInfo.TokenSource = tokenSource; + //Keep track of running thread + //If using Task.Run(), MUST register task in TaskList right away if NOT the CleanUp() method may mark running Task as Error, because it's running but not in list!!! + taskList[jobID] = taskInfo; + if (isSync) + { + jobTask = Task.Run(() => ExecuteJobAsync(processID, jobID, methodInfo, parameters, instance, token, isSync).GetAwaiter().GetResult(), token) + .ContinueWith(t => + { + DeleteCachedProgressDelayedAsync(jobID); + }, TaskContinuationOptions.RunContinuationsAsynchronously); + } + else + { + jobTask = Task.Run(async () => await ExecuteJobAsync(processID, jobID, methodInfo, parameters, instance, token, isSync), token) + .ContinueWith(t => + { + DeleteCachedProgressDelayedAsync(jobID); + }, TaskContinuationOptions.RunContinuationsAsynchronously); + } + taskInfo.JobTask = jobTask; + taskList[jobID] = taskInfo; //re-update with filled Task + } + + private async Task> UpdateProgressEventAsync(string jobID, bool isSync) + { + //Insert a progress row first for the related jobID if it doesn't exist + if (isSync) + { + jobDAL.SetProgress(jobID, null, null, null); + } + else + { + await jobDAL.SetProgressAsync(jobID, null, null, null); + } + await jobDAL.SetCachedProgressAsync(jobID, null, null, null).ConfigureAwait(false); + + var start = DateTime.Now; + var updateTs = config.ProgressDBInterval ?? new TimeSpan(0, 0, 10); //default to 10 sec interval for updating DB + + //SynchronousProgress is event based and called regularly by the running job + SynchronousProgress progress = new SynchronousProgress(progressInfo => + { + jobDAL.SetCachedProgressAsync(jobID, progressInfo.Percent, progressInfo.Note, progressInfo.Data).ConfigureAwait(false); //Update Cache + + var diffTs = DateTime.Now - start; + if (diffTs >= updateTs || progressInfo.Percent >= 100) + { + jobDAL.UpdateProgressAsync(jobID, progressInfo.Percent, progressInfo.Note, progressInfo.Data).ConfigureAwait(false); //Update DB async, don't wait/don't hold + start = DateTime.Now; + } + }); + + return progress; + } + + private async Task ExecuteJobAsync(string processID, string jobID, MethodInfo methodInfo, string parameters, object instance, CancellationToken? token, bool isSync) + { + try + { + //Set job to Running + if (isSync) + jobDAL.SetToRunning(processID, jobID); + else + await jobDAL.SetToRunningAsync(processID, jobID); + jobDAL.SetCachedProgressStatusAsync(jobID, JobStatus.Running); + + var progress = isSync ? UpdateProgressEventAsync(jobID, true).GetAwaiter().GetResult() : await UpdateProgressEventAsync(jobID, false); //Need this to update the progress of the job's + + //Invoke Method + if (token == null) + { + var tokenSource = new CancellationTokenSource(); //not doing anything when using thread.Start() + token = tokenSource.Token; + } + var args = DALHelpers.DeserializeArguments(token.Value, progress, methodInfo, parameters); + methodInfo.Invoke(instance, args); + } + catch (TargetInvocationException exc) + { + if (exc.InnerException is OperationCanceledException) + { + if (isSync) + { + SetToStoppedAsync(new List { jobID }, isSync).GetAwaiter().GetResult(); + } + else + { + await SetToStoppedAsync(new List { jobID }, isSync); + } + + throw exc.InnerException; + } + else + { + var job = isSync ? jobDAL.GetJob(jobID) : await jobDAL.GetJobAsync(jobID); + var error = job.Error + " " + exc.ToString(); + var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() + : await SetErrorAsync(processID, job.JobID, error, isSync); + + throw exc; + } + } + catch (Exception exc) + { + var job = isSync ? jobDAL.GetJob(jobID) : await jobDAL.GetJobAsync(jobID); + var error = job.Error + " " + exc.ToString(); + var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() + : await SetErrorAsync(processID, job.JobID, error, isSync); + + throw exc; + } + + //Completed successfully with no error + if (isSync) + jobDAL.SetCompleted(processID, jobID); + else + await jobDAL.SetCompletedAsync(processID, jobID); + jobDAL.SetCachedProgressStatusAsync(jobID, JobStatus.Completed); + } + + private Task DeleteCachedProgressDelayedAsync(string jobID) + { + return Task.Delay(60000).ContinueWith(_ => + { + jobDAL.DeleteCachedProgressAsync(jobID); + }, TaskContinuationOptions.RunContinuationsAsynchronously); + } + + private async Task SetErrorAsync(string processID, string jobID, string error, bool isSync) + { + jobDAL.SetCachedProgressErrorAsync(jobID, error); + if (isSync) + return jobDAL.SetError(processID, jobID, error); + else + return await jobDAL.SetErrorAsync(processID, jobID, error); + } + + //Called when Server is being shut down. + //Mark all running jobs to stop. + public async Task SetStopAllRunningJobsAsync(bool isSync) + { + if (isSync) + { + //Stop all running Jobs + var runningJobsList = jobDAL.GetJobsByProcessAndStatus(workerProcessID, JobStatus.Running); + if (runningJobsList.Count() > 0) + { + jobDAL.SetCommandStop(runningJobsList.Select(x => x.JobID).ToList()); + } + } + else + { + //Stop all running Jobs + var runningJobsList = await jobDAL.GetJobsByProcessAndStatusAsync(workerProcessID, JobStatus.Running); + if (runningJobsList.Count() > 0) + { + await jobDAL.SetCommandStopAsync(runningJobsList.Select(x => x.JobID).ToList()); + } + } + } + + /// + /// Stops jobs. + /// Only jobs marked with "STOP" command will be acted on. + /// ThreadMode="task" will use CancellationTokenSource.Cancel() + /// Make sure the jobs implement CancellationToken.IsCancellationRequested check for throwing and clean up canceled job. + /// ThreadMode="thread" will use Thread.Abort. + /// No clean up is possible when thread is aborted. + /// + public async Task StopJobsAsync(bool isSync) + { + var jobIDs = isSync ? jobDAL.GetJobIdsByProcessAndCommand(workerProcessID, JobCommand.Stop) : await jobDAL.GetJobIdsByProcessAndCommandAsync(workerProcessID, JobCommand.Stop); + if (isSync) + StopJobsAsync(jobIDs, isSync).GetAwaiter().GetResult(); + else + await StopJobsAsync(jobIDs, isSync); + } + + private async Task StopJobsAsync(IReadOnlyCollection jobIDs, bool isSync) + { + var nonWaitJobIDs = new List(); + if (taskList.Count > 0) + { + foreach (var jobID in jobIDs) + { + var taskInfo = taskList.ContainsKey(jobID) ? taskList[jobID] : null; + if (taskInfo != null) + { + if (!taskInfo.TokenSource.Token.IsCancellationRequested) + { + taskInfo.TokenSource.Cancel(); //attempt to cancel task + Task.Run(async () => await taskInfo.JobTask) + .ContinueWith(result => { taskList.Remove(jobID); }); //Don't hold the process, just run another task to wait for cancellable task + } + } + else + { + nonWaitJobIDs.Add(jobID); + } + } + + //Set to stopped for nonWaitJobIDs + if (isSync) + { + SetToStoppedAsync(nonWaitJobIDs, true).GetAwaiter().GetResult(); + } + else + { + await SetToStoppedAsync(nonWaitJobIDs, false); + } + } + else + { + if (isSync) + { + SetToStoppedAsync(jobIDs, true).GetAwaiter().GetResult(); + } + else + { + await SetToStoppedAsync(jobIDs, false); + } + } + } + + private async Task SetToStoppedAsync(IReadOnlyCollection jobIDs, bool isSync) + { + if (isSync) + { + jobDAL.SetToStopped(jobIDs.ToList()); + } + else + { + await jobDAL.SetToStoppedAsync(jobIDs.ToList()); + } + jobDAL.SetCachedProgressStatusAsync(jobIDs, JobStatus.Stopped); //redis cached progress + jobDAL.DeleteCachedProgressAsync(jobIDs); + } + + /// + /// Cleanup and synchronize running jobs and jobs table. + /// * Job is deleted based on AutoDeletePeriod and AutoDeleteStatus settings. + /// * Mark job as an error, when job status is "RUNNING" in DB table, but there is no actual running thread in the related server process (Zombie Jobs). + /// * Remove thread references in memory, when job is deleted or status in DB is: stopped, error, or completed. + /// + public async Task CleanUpAsync(bool isSync) + { + if(isSync) + { + StopJobsAsync(isSync).GetAwaiter().GetResult(); + + //Delete past completed jobs from storage + if (config.AutoDeletePeriod != null) + { + jobDAL.Delete(config.AutoDeletePeriod.Value, config.AutoDeleteStatus); + } + } + else + { + await StopJobsAsync(isSync); + + //Delete past completed jobs from storage + if (config.AutoDeletePeriod != null) + { + await jobDAL.DeleteAsync(config.AutoDeletePeriod.Value, config.AutoDeleteStatus); + } + } + + //Get all running process + var jobList = isSync ? jobDAL.GetJobsByProcessAndStatus(workerProcessID, JobStatus.Running) : await jobDAL.GetJobsByProcessAndStatusAsync(workerProcessID, JobStatus.Running); + foreach (var job in jobList) + { + if (!taskList.ContainsKey(job.JobID)) + { + //Doesn't exist anymore? + var error = "Error: No actual running job process found. Try reset and run again."; + var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? workerProcessID : job.ProcessID; + var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() + : await SetErrorAsync(processID, job.JobID, error, isSync); + } + } + + if (taskList.Count > 0) + { + var inDBjobIDs = new List(); + jobList = isSync ? jobDAL.GetJobs(taskList.Keys.ToList()) : await jobDAL.GetJobsAsync(taskList.Keys.ToList()); //get all jobs in taskList + + // If jobs doesn't even exists in storage (zombie?), remove from taskList. + inDBjobIDs = jobList.Select(j => j.JobID).ToList(); + var taskListKeys = new List(taskList.Keys); //copy keys before removal + foreach (var jobID in taskListKeys) + { + if (!inDBjobIDs.Contains(jobID)) + { + TaskInfo taskInfo = null; + if (taskList.Keys.Contains(jobID)) + taskInfo = taskList[jobID]; + else + continue; + + taskInfo.TokenSource.Cancel(); //attempt to cancel + taskList.Remove(jobID); + } + } + + // For job status that is stopped, error, completed => Remove from thread list, no need to keep track of them anymore. + var statuses = new List + { + (int)JobStatus.Stopped, + (int)JobStatus.Error, + (int)JobStatus.Completed + }; + + foreach (var job in jobList) + { + if (job.Status != null + && statuses.Contains((int)job.Status) + && taskList.ContainsKey(job.JobID)) + { + var taskInfo = taskList[job.JobID]; + taskInfo.TokenSource.Dispose(); + taskList.Remove(job.JobID); + } + } + + } + + } + + } + +} diff --git a/Shift/packages.config b/Shift/packages.config index 4e42c71..b96da10 100644 --- a/Shift/packages.config +++ b/Shift/packages.config @@ -1,6 +1,6 @@  - + - + \ No newline at end of file