日韩欧美国产精品免费一二-日韩欧美国产精品亚洲二区-日韩欧美国产精品专区-日韩欧美国产另-日韩欧美国产免费看-日韩欧美国产免费看清风阁

LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

.NET 異步并發(fā)操作,只保留最后一次操作

freeflydom
2024年9月5日 11:35 本文熱度 1556

在我們業(yè)務(wù)操作時,難免會有多次操作,比如:界面搜索框,輸入內(nèi)容時時查詢數(shù)據(jù)庫/后臺數(shù)據(jù)。多次觸發(fā)搜索,我們期望什么結(jié)果呢?絕大部分情況,應(yīng)該是只需要最后一次操作的結(jié)果,其它操作應(yīng)該無效。

自定義等待的任務(wù)類

1. 可等待的任務(wù)類 AwaitableTask:

/// <summary>

    /// 可等待的任務(wù)

    /// </summary>

    public class AwaitableTask

    {

        /// <summary>

        /// 獲取任務(wù)是否為不可執(zhí)行狀態(tài)

        /// </summary>

        public bool NotExecutable { get; private set; }


        /// <summary>

        /// 設(shè)置任務(wù)不可執(zhí)行

        /// </summary>

        public void SetNotExecutable()

        {

            NotExecutable = true;

        }


        /// <summary>

        /// 獲取任務(wù)是否有效

        /// 注:對無效任務(wù),可以不做處理。減少并發(fā)操作導(dǎo)致的干擾

        /// </summary>

        public bool IsInvalid { get; private set; } = true;


        /// <summary>

        /// 標(biāo)記任務(wù)無效

        /// </summary>

        public void MarkTaskValid()

        {

            IsInvalid = false;

        }


        #region Task


        private readonly Task _task;

        /// <summary>

        /// 初始化可等待的任務(wù)。

        /// </summary>

        /// <param name="task"></param>

        public AwaitableTask(Task task) => _task = task;


        /// <summary>

        /// 獲取任務(wù)是否已完成

        /// </summary>

        public bool IsCompleted => _task.IsCompleted;


        /// <summary>

        /// 任務(wù)的Id

        /// </summary>

        public int TaskId => _task.Id;


        /// <summary>

        /// 開始任務(wù)

        /// </summary>

        public void Start() => _task.Start();


        /// <summary>

        /// 同步執(zhí)行開始任務(wù)

        /// </summary>

        public void RunSynchronously() => _task.RunSynchronously();


        #endregion


        #region TaskAwaiter


        /// <summary>

        /// 獲取任務(wù)等待器

        /// </summary>

        /// <returns></returns>

        public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);


        /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>

        [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]

        public struct TaskAwaiter : INotifyCompletion

        {

            private readonly AwaitableTask _task;


            /// <summary>

            /// 任務(wù)等待器

            /// </summary>

            /// <param name="awaitableTask"></param>

            public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;


            /// <summary>

            /// 任務(wù)是否完成.

            /// </summary>

            public bool IsCompleted => _task._task.IsCompleted;


            /// <inheritdoc />

            public void OnCompleted(Action continuation)

            {

                var This = this;

                _task._task.ContinueWith(t =>

                {

                    if (!This._task.NotExecutable) continuation?.Invoke();

                });

            }

            /// <summary>

            /// 獲取任務(wù)結(jié)果

            /// </summary>

            public void GetResult() => _task._task.Wait();

        }


        #endregion


    }

無效的操作可以分為以下倆種:

已經(jīng)進行中的操作,后續(xù)結(jié)果應(yīng)標(biāo)記為無效

還沒開始的操作,后續(xù)不執(zhí)行


自定義任務(wù)類型 AwaitableTask中,添加倆個字段NotExecutable、IsInvalid:

/// <summary>

    /// 獲取任務(wù)是否為不可執(zhí)行狀態(tài)

    /// </summary>

    public bool NotExecutable { get; private set; }

    /// <summary>

    /// 獲取任務(wù)是否有效

    /// 注:對無效任務(wù),可以不做處理。減少并發(fā)操作導(dǎo)致的干擾

    /// </summary>

    public bool IsInvalid { get; private set; } = true;


2. 有返回結(jié)果的可等待任務(wù)類 AwaitableTask<TResult>:

/// <summary>

    /// 可等待的任務(wù)

    /// </summary>

    /// <typeparam name="TResult"></typeparam>

    public class AwaitableTask<TResult> : AwaitableTask

    {

        private readonly Task<TResult> _task;

        /// <summary>

        /// 初始化可等待的任務(wù)

        /// </summary>

        /// <param name="task">需要執(zhí)行的任務(wù)</param>

        public AwaitableTask(Task<TResult> task) : base(task) => _task = task;


        #region TaskAwaiter


        /// <summary>

        /// 獲取任務(wù)等待器

        /// </summary>

        /// <returns></returns>

        public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);


        /// <summary>

        /// 任務(wù)等待器

        /// </summary>

        [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]

        public new struct TaskAwaiter : INotifyCompletion

        {

            private readonly AwaitableTask<TResult> _task;


            /// <summary>

            /// 初始化任務(wù)等待器

            /// </summary>

            /// <param name="awaitableTask"></param>

            public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;


            /// <summary>

            /// 任務(wù)是否已完成。

            /// </summary>

            public bool IsCompleted => _task._task.IsCompleted;


            /// <inheritdoc />

            public void OnCompleted(Action continuation)

            {

                var This = this;

                _task._task.ContinueWith(t =>

                {

                    if (!This._task.NotExecutable) continuation?.Invoke();

                });

            }


            /// <summary>

            /// 獲取任務(wù)結(jié)果。

            /// </summary>

            /// <returns></returns>

            public TResult GetResult() => _task._task.Result;

        }


        #endregion

    }


添加任務(wù)等待器,同步等待結(jié)果返回:

/// <summary>

    /// 獲取任務(wù)等待器

    /// </summary>

    /// <returns></returns>

    public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);


    /// <summary>

    /// 任務(wù)等待器

    /// </summary>

    [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]

    public new struct TaskAwaiter : INotifyCompletion

    {

        private readonly AwaitableTask<TResult> _task;


        /// <summary>

        /// 初始化任務(wù)等待器

        /// </summary>

        /// <param name="awaitableTask"></param>

        public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;


        /// <summary>

        /// 任務(wù)是否已完成。

        /// </summary>

        public bool IsCompleted => _task._task.IsCompleted;


        /// <inheritdoc />

        public void OnCompleted(Action continuation)

        {

            var This = this;

            _task._task.ContinueWith(t =>

            {

                if (!This._task.NotExecutable) continuation?.Invoke();

            });

        }


        /// <summary>

        /// 獲取任務(wù)結(jié)果。

        /// </summary>

        /// <returns></returns>

        public TResult GetResult() => _task._task.Result;

    }


異步任務(wù)隊列

/// <summary>

    /// 異步任務(wù)隊列

    /// </summary>

    public class AsyncTaskQueue : IDisposable

    {

        /// <summary>

        /// 異步任務(wù)隊列

        /// </summary>

        public AsyncTaskQueue()

        {

            _autoResetEvent = new AutoResetEvent(false);

            _thread = new Thread(InternalRunning) { IsBackground = true };

            _thread.Start();

        }


        #region 執(zhí)行


        /// <summary>

        /// 執(zhí)行異步操作

        /// </summary>

        /// <typeparam name="T">返回結(jié)果類型</typeparam>

        /// <param name="func">異步操作</param>

        /// <returns>isInvalid:異步操作是否有效;result:異步操作結(jié)果</returns>

        public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)

        {

            var task = GetExecutableTask(func);

            var result = await await task;

            if (!task.IsInvalid)

            {

                result = default(T);

            }

            return (task.IsInvalid, result);

        }


        /// <summary>

        /// 執(zhí)行異步操作

        /// </summary>

        /// <typeparam name="T"></typeparam>

        /// <param name="func"></param>

        /// <returns></returns>

        public async Task<bool> ExecuteAsync<T>(Func<Task> func)

        {

            var task = GetExecutableTask(func);

            await await task;

            return task.IsInvalid;

        }


        #endregion


        #region 添加任務(wù)


        /// <summary>

        /// 獲取待執(zhí)行任務(wù)

        /// </summary>

        /// <param name="action"></param>

        /// <returns></returns>

        private AwaitableTask GetExecutableTask(Action action)

        {

            var awaitableTask = new AwaitableTask(new Task(action));

            AddPenddingTaskToQueue(awaitableTask);

            return awaitableTask;

        }


        /// <summary>

        /// 獲取待執(zhí)行任務(wù)

        /// </summary>

        /// <typeparam name="TResult"></typeparam>

        /// <param name="function"></param>

        /// <returns></returns>

        private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function)

        {

            var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function));

            AddPenddingTaskToQueue(awaitableTask);

            return awaitableTask;

        }


        /// <summary>

        /// 添加待執(zhí)行任務(wù)到隊列

        /// </summary>

        /// <param name="task"></param>

        /// <returns></returns>

        private void AddPenddingTaskToQueue(AwaitableTask task)

        {

            //添加隊列,加鎖。

            lock (_queue)

            {

                _queue.Enqueue(task);

                //開始執(zhí)行任務(wù)

                _autoResetEvent.Set();

            }

        }


        #endregion


        #region 內(nèi)部運行


        private void InternalRunning()

        {

            while (!_isDisposed)

            {

                if (_queue.Count == 0)

                {

                    //等待后續(xù)任務(wù)

                    _autoResetEvent.WaitOne();

                }

                while (TryGetNextTask(out var task))

                {

                    //如已從隊列中刪除

                    if (task.NotExecutable) continue;


                    if (UseSingleThread)

                    {

                        task.RunSynchronously();

                    }

                    else

                    {

                        task.Start();

                    }

                }

            }

        }

        /// <summary>

        /// 上一次異步操作

        /// </summary>

        private AwaitableTask _lastDoingTask;

        private bool TryGetNextTask(out AwaitableTask task)

        {

            task = null;

            while (_queue.Count > 0)

            {

                //獲取并從隊列中移除任務(wù)

                if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))

                {

                    //設(shè)置進行中的異步操作無效

                    _lastDoingTask?.MarkTaskValid();

                    _lastDoingTask = task;

                    return true;

                }

                //并發(fā)操作,設(shè)置任務(wù)不可執(zhí)行

                task.SetNotExecutable();

            }

            return false;

        }


        #endregion


        #region dispose


        /// <inheritdoc />

        public void Dispose()

        {

            Dispose(true);

            GC.SuppressFinalize(this);

        }


        /// <summary>

        /// 析構(gòu)任務(wù)隊列

        /// </summary>

        ~AsyncTaskQueue() => Dispose(false);


        private void Dispose(bool disposing)

        {

            if (_isDisposed) return;

            if (disposing)

            {

                _autoResetEvent.Dispose();

            }

            _thread = null;

            _autoResetEvent = null;

            _isDisposed = true;

        }


        #endregion


        #region 屬性及字段


        /// <summary>

        /// 是否使用單線程完成任務(wù).

        /// </summary>

        public bool UseSingleThread { get; set; } = true;


        /// <summary>

        /// 自動取消以前的任務(wù)。

        /// </summary>

        public bool AutoCancelPreviousTask { get; set; } = false;


        private bool _isDisposed;

        private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();

        private Thread _thread;

        private AutoResetEvent _autoResetEvent;


        #endregion

 

添加異步任務(wù)隊列類,用于任務(wù)的管理,如添加、執(zhí)行、篩選等:

1. 自動取消之前的任務(wù) AutoCancelPreviousTask

內(nèi)部使用線程,循環(huán)獲取當(dāng)前任務(wù)列表,如果當(dāng)前任務(wù)被標(biāo)記NotExecutable不可執(zhí)行,則跳過。

NotExecutable是何時標(biāo)記的?

獲取任務(wù)時,標(biāo)記所有獲取的任務(wù)為NotExecutable。直到任務(wù)列表中為空,那么只執(zhí)行最后獲取的一個任務(wù)。

2. 標(biāo)記已經(jīng)進行的任務(wù)無效 MarkTaskValid

當(dāng)前進行的任務(wù),無法中止,那么標(biāo)記為無效即可。

/// <summary>

    /// 上一次異步操作

    /// </summary>

    private AwaitableTask _lastDoingTask;

    private bool TryGetNextTask(out AwaitableTask task)

    {

        task = null;

        while (_queue.Count > 0)

        {

            //獲取并從隊列中移除任務(wù)

            if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))

            {

                //設(shè)置進行中的異步操作無效

                _lastDoingTask?.MarkTaskValid();

                _lastDoingTask = task;

                return true;

            }

            //并發(fā)操作,設(shè)置任務(wù)不可執(zhí)行

            task.SetNotExecutable();

        }

        return false;

    }


后續(xù)執(zhí)行完后,根據(jù)此標(biāo)記,設(shè)置操作結(jié)果為空。

/// <summary>

    /// 執(zhí)行異步操作

    /// </summary>

    /// <typeparam name="T">返回結(jié)果類型</typeparam>

    /// <param name="func">異步操作</param>

    /// <returns>isInvalid:異步操作是否有效;result:異步操作結(jié)果</returns>

    public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)

    {

        var task = GetExecutableTask(func);

        var result = await await task;

        if (!task.IsInvalid)

        {

            result = default(T);

        }

        return (task.IsInvalid, result);

    }


實踐測試

啟動9個并發(fā)任務(wù),測試實際的任務(wù)隊列并發(fā)操作管理:

public MainWindow()

    {

        InitializeComponent();

        _asyncTaskQueue = new AsyncTaskQueue

        {

            AutoCancelPreviousTask = true,

            UseSingleThread = true

        };

    }

    private AsyncTaskQueue _asyncTaskQueue;

    private void ButtonBase_OnClick(object sender, RoutedEventArgs e)

    {

        // 快速啟動9個任務(wù)

        for (var i = 1; i < 10; i++)

        {

            Test(_asyncTaskQueue, i);

        }

    }

    public static async void Test(AsyncTaskQueue taskQueue, int num)

    {

        var result = await taskQueue.ExecuteAsync(async () =>

        {

            Debug.WriteLine("輸入:" + num);

            // 長時間耗時任務(wù)

            await Task.Delay(TimeSpan.FromSeconds(5));

            return num * 100;

        });

        Debug.WriteLine($"{num}輸出的:" + result);

    }

測試結(jié)果如下:

一共9次操作,只有最后一次操作結(jié)果,才是有效的。其它8次操作,一次是無效的,7次操作被取消不執(zhí)行。

固定時間間隔只保留最后操作

實際業(yè)務(wù)過程中,大部分場景并不是瞬間丟過來N個任務(wù),而是比如100ms內(nèi)有20個操作觸發(fā)。

這類高并發(fā)操作,不止是上方做一個隊列獲取首尾任務(wù),還需要加個延時(時間間隔),減少并發(fā)操作的執(zhí)行,可以基于上方TryGetNextTask方法操作:

/// <summary>

        /// 獲取下一任務(wù)

        /// </summary>

        /// <returns></returns>

        private async Task<AwaitableTask> TryGetNextTask()

        {

            //獲取并從隊列中移除任務(wù)

            if (_queue.TryDequeue(out var task))

            {

                //設(shè)置進行中的異步操作無效

                foreach (var lastDoingTask in _lastDoingTasks)

                {

                    lastDoingTask.MarkTaskInvalid();

                }

                _lastDoingTasks.Add(task);

                //如果有中間任務(wù),則立即取消

                if (_queue.Count != 0)

                {

                    task.SetNotExecutable();

                }

                else

                {

                    //如果沒有中間任務(wù),則設(shè)置延遲時間再確認(rèn)是否有中間任務(wù)

                    await Task.Delay(_delay).ConfigureAwait(false);

                    if (_queue.Count != 0)

                    {

                        task.SetNotExecutable();

                    }

                }

                return task;

            }

設(shè)置延時取任務(wù)后,就能實現(xiàn)在固定的時間內(nèi)最多只會觸發(fā)一次,并且是最后一次操作。


作者:唐宋元明清2188

出處:http://www.cnblogs.com/kybs0/

本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須在文章頁面給出原文連接,否則保留追究法律責(zé)任的權(quán)利。



該文章在 2024/9/5 11:36:52 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點晴ERP是一款針對中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點晴PMS碼頭管理系統(tǒng)主要針對港口碼頭集裝箱與散貨日常運作、調(diào)度、堆場、車隊、財務(wù)費用、相關(guān)報表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點,圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點晴WMS倉儲管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務(wù)都免費,不限功能、不限時間、不限用戶的免費OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved

主站蜘蛛池模板: 国产黄a三级三级三级看三级 | 最近中文字幕免费高清mv视频6 | 国产精品1区2区3区在线播放 | 视频区国产图片区小说区 | 国产人成午夜免电影费观看 | 欧美亚洲综合另类视频 | 亚洲成年看片在线观看 | 欧美大肥婆大肥bbbbb | 国产喷水大秀在线观看2025 | 日本在线高清不卡免费播放 | 桃色影视国产一区二区三区 | 国产午夜不卡 | 国产一区鲁鲁在线视频免费播放 | 人成精品视频三区二区一区 | 亚洲成l人在线观看线路 | 国产区综合 | 亚洲欧美日韩不卡在线观看 | 日本黄页免费大片在线观看 | 欧美日韩亚洲视频精品 | 久精品国产欧美亚洲色a大片 | 人片在线观看 | 欧美综合自拍亚洲综合图区 | 国产午夜福利在线永久视频 | 日韩欧美一级大片 | 好吊日在线视频观看97 | 挤奶在线 | 一区二区三区四区在线播放视频 | 欧美午夜不卡在线观看最新 | 日韩精品一区二区三线 | 国产精品爽爽ⅴa在线观看 欧美精品一区二区三区免费 | 日本天堂天v在线播放 | 亚洲日本一线产区和二线产 | 国产乱子伦不卡视频 | 年轻人在线无毒不卡 | 免费一级中文字 | 国产探花在线观看 | 秋霞电影 | 欧美亚洲国产清纯综合图区 | 二三区成人影片 | 亚洲卡一卡二卡三乱草莓 | 日本三级理论一区二区三区 |