From 72c194f7db2e2e7b7263b14a7ca4688b095a8ab1 Mon Sep 17 00:00:00 2001 From: Patrick Simpson Date: Wed, 17 Apr 2019 13:37:27 +0300 Subject: [PATCH] Added backgrouund respawn task executor --- .../AcaciaZPushPlugin.csproj | 1 + .../AcaciaZPushPlugin/DebugOptions.cs | 3 +- .../Features/Signatures/FeatureSignatures.cs | 14 +- .../AcaciaZPushPlugin/Utils/Tasks.cs | 3 + .../Utils/TasksBackgroundRespawn.cs | 140 ++++++++++++++++++ 5 files changed, 156 insertions(+), 5 deletions(-) create mode 100644 src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/TasksBackgroundRespawn.cs diff --git a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/AcaciaZPushPlugin.csproj b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/AcaciaZPushPlugin.csproj index a7c56df..1531559 100644 --- a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/AcaciaZPushPlugin.csproj +++ b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/AcaciaZPushPlugin.csproj @@ -392,6 +392,7 @@ + diff --git a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/DebugOptions.cs b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/DebugOptions.cs index d7c5109..478a07d 100644 --- a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/DebugOptions.cs +++ b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/DebugOptions.cs @@ -260,7 +260,8 @@ namespace Acacia { MainThread, Background, - Synchronous + Synchronous, + BackgroundRespawn } #region Access methods diff --git a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Features/Signatures/FeatureSignatures.cs b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Features/Signatures/FeatureSignatures.cs index f21ee07..b07dc85 100644 --- a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Features/Signatures/FeatureSignatures.cs +++ b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Features/Signatures/FeatureSignatures.cs @@ -213,13 +213,19 @@ namespace Acacia.Features.Signatures // Set default signatures if available and none are set if (!string.IsNullOrEmpty(result.new_message) && ShouldSetSignature(account.SignatureNewMessage)) { - Logger.Instance.Trace(this, "Setting signature new message: {0}: {1}", account, result.new_message); - account.SignatureNewMessage = fullNames[result.new_message]; + Tasks.Task(new AcaciaTask(null, this, "SignatureNewMessage", () => + { + Logger.Instance.Trace(this, "Setting signature new message: {0}: {1}", account, result.new_message); + account.SignatureNewMessage = fullNames[result.new_message]; + })); } if (!string.IsNullOrEmpty(result.replyforward_message) && ShouldSetSignature(account.SignatureReplyForwardMessage)) { - Logger.Instance.Trace(this, "Setting signature reply message: {0}: {1}", account, result.replyforward_message); - account.SignatureReplyForwardMessage = fullNames[result.replyforward_message]; + Tasks.Task(new AcaciaTask(null, this, "SignatureReplyForwardMessage", () => + { + Logger.Instance.Trace(this, "Setting signature reply message: {0}: {1}", account, result.replyforward_message); + account.SignatureReplyForwardMessage = fullNames[result.replyforward_message]; + })); } Logger.Instance.Trace(this, "Signature synced: {0}: {1}", account, result.hash); diff --git a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/Tasks.cs b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/Tasks.cs index 93cc371..cc09b5c 100644 --- a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/Tasks.cs +++ b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/Tasks.cs @@ -147,6 +147,9 @@ namespace Acacia.Utils case DebugOptions.Threading.Background: _executor = new TasksBackground(); break; + case DebugOptions.Threading.BackgroundRespawn: + _executor = new TasksBackgroundRespawn(); + break; } if (GlobalOptions.INSTANCE.TaskTrace) diff --git a/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/TasksBackgroundRespawn.cs b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/TasksBackgroundRespawn.cs new file mode 100644 index 0000000..f9addd6 --- /dev/null +++ b/src/AcaciaZPushPlugin/AcaciaZPushPlugin/Utils/TasksBackgroundRespawn.cs @@ -0,0 +1,140 @@ + +/// Copyright 2019 Kopano b.v. +/// +/// This program is free software: you can redistribute it and/or modify +/// it under the terms of the GNU Affero General Public License, version 3, +/// as published by the Free Software Foundation. +/// +/// This program is distributed in the hope that it will be useful, +/// but WITHOUT ANY WARRANTY; without even the implied warranty of +/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +/// GNU Affero General Public License for more details. +/// +/// You should have received a copy of the GNU Affero General Public License +/// along with this program.If not, see. +/// +/// Consult LICENSE file for details +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Acacia.Features.DebugSupport; + +namespace Acacia.Utils +{ + public class TasksBackgroundRespawn : TaskExecutor + { + private readonly BlockingCollection _tasks = new BlockingCollection(); + public static int TIMEOUT_MS = 5000; + + public TasksBackgroundRespawn() + { + Thread t = new Thread(Watcher); + t.SetApartmentState(ApartmentState.STA); + t.Start(); + } + + private void Watcher() + { + while (!_tasks.IsCompleted) + { + WorkerThread worker = new WorkerThread(this); + worker.Run(); + } + } + + private enum State + { + Dequeue, + Execute, + Cancel + } + private class WorkerThread + { + private readonly TasksBackgroundRespawn _tasks; + + + private int state; + private int counter; + + public WorkerThread(TasksBackgroundRespawn tasks) + { + this._tasks = tasks; + } + + private void Worker() + { + try + { + while (!_tasks._tasks.IsCompleted) + { + // Check if we need to stop + if (Interlocked.Exchange(ref state, (int)State.Dequeue) == (int)State.Cancel) + { + Logger.Instance.Error(this, "Worker cancelled"); + break; + } + + Logger.Instance.Trace(this, "Take task 1"); + AcaciaTask task = _tasks._tasks.Take(); + Logger.Instance.Trace(this, "Take task 2: {0}", task); + + // Set the state and increment the counter + Interlocked.Exchange(ref state, (int)State.Execute); + Interlocked.Increment(ref counter); + + // Perform the task + _tasks.PerformTask(task); + + Logger.Instance.Trace(this, "Take task 3: {0}", task); + } + Logger.Instance.Debug(this, "Worker completed"); + } + catch (Exception e) + { + Logger.Instance.Warning(this, "Worker failure: {0}", e); + } + } + + public void Run() + { + // Start the thread + Thread t = new Thread(Worker); + t.SetApartmentState(ApartmentState.STA); + t.Start(); + + int lastCount = 0; + + // Check for time out + for(;;) + { + Thread.Sleep(TIMEOUT_MS); + + int count = counter; + if (state == (int)State.Execute && lastCount == count) + { + // Have been hanging in this task + break; + } + lastCount = count; + } + + // Cancel + Interlocked.Exchange(ref state, (int)State.Cancel); + } + } + + + protected override void EnqueueTask(AcaciaTask task) + { + Logger.Instance.Trace(this, "EnqueueTask 1: {0}", task); + _tasks.Add(task); + Logger.Instance.Trace(this, "EnqueueTask 2: {0}", task); + } + + override public string Name { get { return "Background"; } } + } +}