IT俱乐部 ASP.NET .NET Framework 4.8 多线程编程最佳实践

.NET Framework 4.8 多线程编程最佳实践

.NET Framework 4.8 多线程编程

1.Task基础编程

  • Task的多种创建方式(Task.Run、Task.Factory.StartNew等)
  • 连续任务(ContinueWith)和任务链
  • Task异常处理机制
  • Task的状态管理

2.Async/Await模式

  • 基础async/await使用方法
  • 异步方法的返回值处理
  • 并发执行多个异步操作(Task.WhenAll、Task.WhenAny)
  • 异步异常处理
  • CancellationToken取消机制

3.高级异步模式

  • TaskCompletionSource创建自定义异步操作
  • 异步信号量(SemaphoreSlim)
  • 自定义异步锁实现
  • 异步生产者-消费者模式
  • 限流的批量异步操作

4.并行编程(Parallel和PLINQ)

  • Parallel.For和Parallel.ForEach循环
  • Parallel.Invoke并行执行多个操作
  • ParallelOptions控制并行度
  • PLINQ查询和数据处理
  • 并行聚合操作
  • 性能对比示例

5.数据流(TPL Dataflow)

  • TransformBlock、ActionBlock等数据流块
  • 构建数据处理管道
  • BroadcastBlock一对多广播
  • BatchBlock批处理
  • JoinBlock数据合并

6.异步编程最佳实践

  • 避免async void(除事件处理器外)
  • ConfigureAwait的正确使用
  • 避免阻塞异步代码(防死锁)
  • ValueTask性能优化
  • 正确处理多个异步操作
  • IAsyncDisposable资源清理

关键技术对比:

传统Thread vs 现代Task

特性 Thread Task
创建开销 低(使用线程池)
返回值 不支持 Task支持
异常处理 困难 内置支持
组合操作 手动实现 WhenAll/WhenAny
取消机制 手动实现 CancellationToken

同步 vs 异步编程模式

场景 同步(Thread/Lock) 异步(async/await)
I/O操作 阻塞线程 不阻塞,高效
CPU密集 适合 配合Task.Run
可读性 复杂 简洁
调试 困难 相对简单
性能 线程开销大 资源利用率高

使用场景建议:

使用async/await的场景:

  • Web API调用
  • 数据库操作
  • 文件I/O
  • 网络通信
  • UI响应性要求高的应用

使用Parallel/PLINQ的场景:

  • 大数据集处理
  • CPU密集型计算
  • 批量数据转换
  • 并行算法实现

使用传统Thread的场景:

  • 需要精确控制线程属性
  • 长时间运行的后台任务
  • 与旧代码库集成
  • 特殊的线程优先级需求

性能优化建议:

  1. 优先使用async/await:避免阻塞线程,提高资源利用率
  2. 使用ConfigureAwait(false):在库代码中避免捕获上下文
  3. 控制并发度:使用SemaphoreSlim或ParallelOptions限制并发
  4. 选择合适的数据结构:使用Concurrent集合处理并发访问
  5. 避免过度并行化:评估并行化的收益是否大于开销

这份完整的指南现在涵盖了.NET Framework 4.8中从基础Thread到现代async/await的全部并发编程技术

一、线程基础概念

在.NET Framework 4.8中,多线程编程主要涉及以下几个核心类:

  • Thread – 线程的基本类
  • ThreadPool – 线程池
  • Task – 任务并行库(TPL)
  • 各种同步原语(lockMonitorMutexSemaphore等)

二、线程的生命周期控制

1. 线程的创建与启动

using System;
using System.Threading;
public class ThreadLifecycleExample
{
    // 线程工作方法
    private static void WorkerMethod(object state)
    {
        string threadName = (string)state;
        Console.WriteLine($"[{threadName}] 线程开始执行,线程ID: {Thread.CurrentThread.ManagedThreadId}");
        for (int i = 0; i  WorkerMethod("Thread1")));
        // 方式2:使用ParameterizedThreadStart委托
        Thread thread2 = new Thread(new ParameterizedThreadStart(WorkerMethod));
        // 方式3:直接传递方法
        Thread thread3 = new Thread(() => WorkerMethod("Thread3"));
        // 设置线程属性
        thread1.Name = "Worker-1";
        thread1.IsBackground = false; // 前台线程
        thread2.Name = "Worker-2";
        thread2.IsBackground = true;  // 后台线程
        // 启动线程
        thread1.Start();
        thread2.Start("Thread2");
        thread3.Start();
        // 等待线程完成
        thread1.Join();
        thread2.Join();
        thread3.Join();
    }
}

2. 线程的暂停、继续与停止

using System;
using System.Threading;
public class ThreadControlExample
{
    private static ManualResetEvent pauseEvent = new ManualResetEvent(true);
    private static ManualResetEvent shutdownEvent = new ManualResetEvent(false);
    private static CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    // 可控制的工作线程
    private static void ControllableWorker(object state)
    {
        string workerName = (string)state;
        CancellationToken token = cancellationTokenSource.Token;
        Console.WriteLine($"[{workerName}] 线程启动");
        try
        {
            while (!token.IsCancellationRequested)
            {
                // 等待暂停信号
                pauseEvent.WaitOne();
                // 检查是否需要退出
                if (token.IsCancellationRequested)
                    break;
                // 执行工作
                Console.WriteLine($"[{workerName}] 正在工作... 时间: {DateTime.Now:HH:mm:ss}");
                // 模拟工作负载
                Thread.Sleep(1000);
                // 检查停止信号
                if (WaitHandle.WaitAny(new WaitHandle[] { shutdownEvent }, 0) == 0)
                {
                    Console.WriteLine($"[{workerName}] 收到停止信号");
                    break;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"[{workerName}] 操作被取消");
        }
        finally
        {
            Console.WriteLine($"[{workerName}] 线程退出");
        }
    }
    public static void ThreadControlDemo()
    {
        Thread workerThread = new Thread(ControllableWorker)
        {
            Name = "ControlledWorker",
            IsBackground = false
        };
        // 启动线程
        workerThread.Start("Worker");
        // 让线程运行3秒
        Thread.Sleep(3000);
        // 暂停线程
        Console.WriteLine("n[主线程] 暂停工作线程...");
        pauseEvent.Reset();
        Thread.Sleep(2000);
        // 继续线程
        Console.WriteLine("[主线程] 恢复工作线程...");
        pauseEvent.Set();
        Thread.Sleep(3000);
        // 停止线程(推荐方式:使用CancellationToken)
        Console.WriteLine("n[主线程] 停止工作线程...");
        cancellationTokenSource.Cancel();
        // 等待线程结束
        workerThread.Join(5000);
        if (workerThread.IsAlive)
        {
            Console.WriteLine("[主线程] 强制终止线程");
            workerThread.Abort(); // 注意:不推荐使用,仅作为最后手段
        }
    }
}

三、线程间通信机制

1. 共享内存通信

using System;
using System.Threading;
using System.Collections.Concurrent;
public class SharedMemoryCommunication
{
    // 共享数据结构
    private static readonly ConcurrentQueue messageQueue = new ConcurrentQueue();
    private static readonly object lockObject = new object();
    private static int sharedCounter = 0;
    // 生产者线程
    private static void Producer(object state)
    {
        string producerName = (string)state;
        Random random = new Random();
        for (int i = 0; i = 20 && messageQueue.IsEmpty)
                        break;
                }
            }
        }
        Console.WriteLine($"[消费者 {consumerName}] 退出");
    }
    public static void RunSharedMemoryExample()
    {
        Thread producer1 = new Thread(Producer) { Name = "Producer1" };
        Thread producer2 = new Thread(Producer) { Name = "Producer2" };
        Thread consumer1 = new Thread(Consumer) { Name = "Consumer1" };
        Thread consumer2 = new Thread(Consumer) { Name = "Consumer2" };
        producer1.Start("P1");
        producer2.Start("P2");
        consumer1.Start("C1");
        consumer2.Start("C2");
        producer1.Join();
        producer2.Join();
        consumer1.Join();
        consumer2.Join();
    }
}

2. 事件通信机制

using System;
using System.Threading;
public class EventCommunication
{
    // 自动重置事件
    private static AutoResetEvent autoEvent = new AutoResetEvent(false);
    // 手动重置事件
    private static ManualResetEvent manualEvent = new ManualResetEvent(false);
    // 倒计时事件
    private static CountdownEvent countdownEvent = new CountdownEvent(3);
    // 等待事件的工作线程
    private static void WaitingWorker(object state)
    {
        string workerName = (string)state;
        Console.WriteLine($"[{workerName}] 等待事件信号...");
        // 等待自动重置事件
        autoEvent.WaitOne();
        Console.WriteLine($"[{workerName}] 收到自动重置事件信号");
        // 等待手动重置事件
        manualEvent.WaitOne();
        Console.WriteLine($"[{workerName}] 收到手动重置事件信号");
        // 通知倒计时事件
        countdownEvent.Signal();
        Console.WriteLine($"[{workerName}] 完成工作");
    }
    public static void RunEventExample()
    {
        Thread[] workers = new Thread[3];
        for (int i = 0; i 

3. 信号量通信

using System;
using System.Threading;
public class SemaphoreCommunication
{
    // 信号量:最多允许3个线程同时访问
    private static Semaphore semaphore = new Semaphore(3, 3);
    // 互斥量
    private static Mutex mutex = new Mutex();
    // 使用信号量控制的工作方法
    private static void SemaphoreWorker(object state)
    {
        string workerName = (string)state;
        Console.WriteLine($"[{workerName}] 等待进入临界区...");
        semaphore.WaitOne();
        try
        {
            Console.WriteLine($"[{workerName}] 进入临界区,开始工作");
            Thread.Sleep(2000); // 模拟工作
            Console.WriteLine($"[{workerName}] 完成工作");
        }
        finally
        {
            Console.WriteLine($"[{workerName}] 离开临界区");
            semaphore.Release();
        }
    }
    // 使用互斥量的工作方法
    private static void MutexWorker(object state)
    {
        string workerName = (string)state;
        Console.WriteLine($"[{workerName}] 尝试获取互斥量...");
        mutex.WaitOne();
        try
        {
            Console.WriteLine($"[{workerName}] 获得互斥量,独占访问资源");
            Thread.Sleep(1000);
            Console.WriteLine($"[{workerName}] 完成独占访问");
        }
        finally
        {
            mutex.ReleaseMutex();
            Console.WriteLine($"[{workerName}] 释放互斥量");
        }
    }
    public static void RunSemaphoreExample()
    {
        Console.WriteLine("=== 信号量示例 ===");
        Thread[] semaphoreThreads = new Thread[6];
        for (int i = 0; i 

四、线程保护与同步

1. Lock和Monitor

using System;
using System.Threading;
public class ThreadSynchronization
{
    private static readonly object lockObject = new object();
    private static int balance = 1000;
    // 使用lock语句
    private static void TransferWithLock(int amount, string from, string to)
    {
        lock (lockObject)
        {
            Console.WriteLine($"[{Thread.CurrentThread.Name}] 转账开始: {from} -> {to}, 金额: {amount}");
            if (balance >= amount)
            {
                balance -= amount;
                Thread.Sleep(100); // 模拟处理时间
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 转账成功,余额: {balance}");
            }
            else
            {
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 余额不足,转账失败");
            }
        }
    }
    // 使用Monitor类(更灵活的控制)
    private static void TransferWithMonitor(int amount, string from, string to)
    {
        bool lockTaken = false;
        try
        {
            Monitor.TryEnter(lockObject, TimeSpan.FromSeconds(1), ref lockTaken);
            if (lockTaken)
            {
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 获得锁,执行转账");
                if (balance >= amount)
                {
                    balance -= amount;
                    Thread.Sleep(100);
                    Console.WriteLine($"[{Thread.CurrentThread.Name}] 转账成功,余额: {balance}");
                    // 通知等待的线程
                    Monitor.Pulse(lockObject);
                }
                else
                {
                    Console.WriteLine($"[{Thread.CurrentThread.Name}] 余额不足,等待...");
                    Monitor.Wait(lockObject, 1000); // 等待最多1秒
                }
            }
            else
            {
                Console.WriteLine($"[{Thread.CurrentThread.Name}] 无法获得锁");
            }
        }
        finally
        {
            if (lockTaken)
            {
                Monitor.Exit(lockObject);
            }
        }
    }
    public static void RunSynchronizationExample()
    {
        Thread[] threads = new Thread[5];
        for (int i = 0; i  TransferWithLock(amount, "AccountA", "AccountB"))
            {
                Name = $"Transfer-Thread-{i + 1}"
            };
        }
        foreach (var thread in threads)
            thread.Start();
        foreach (var thread in threads)
            thread.Join();
        Console.WriteLine($"n最终余额: {balance}");
    }
}

2. ReaderWriterLock(读写锁)

using System;
using System.Threading;
using System.Collections.Generic;
public class ReaderWriterExample
{
    private static ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
    private static Dictionary cache = new Dictionary();
    private static Random random = new Random();
    // 读操作
    private static void ReadData(object state)
    {
        string readerName = (string)state;
        for (int i = 0; i 

3. 线程安全集合

using System;
using System.Threading;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ThreadSafeCollections
{
    // 线程安全的集合
    private static ConcurrentDictionary concurrentDict = new ConcurrentDictionary();
    private static ConcurrentQueue concurrentQueue = new ConcurrentQueue();
    private static ConcurrentStack concurrentStack = new ConcurrentStack();
    private static ConcurrentBag concurrentBag = new ConcurrentBag();
    private static BlockingCollection blockingCollection = new BlockingCollection(10);
    // 使用BlockingCollection的生产者-消费者模式
    private static void BlockingProducer()
    {
        for (int i = 0; i  oldValue + 1);
        // 获取或添加
        int value = concurrentDict.GetOrAdd("key2", 2);
        // 尝试获取
        if (concurrentDict.TryGetValue("key1", out int result))
        {
            Console.WriteLine($"获取值: key1 = {result}");
        }
        // 尝试移除
        if (concurrentDict.TryRemove("key1", out int removed))
        {
            Console.WriteLine($"移除值: key1 = {removed}");
        }
    }
    public static void RunThreadSafeCollectionExample()
    {
        Console.WriteLine("=== BlockingCollection 示例 ===");
        Thread producer = new Thread(BlockingProducer);
        Thread consumer1 = new Thread(BlockingConsumer);
        Thread consumer2 = new Thread(BlockingConsumer);
        producer.Start();
        consumer1.Start("Consumer1");
        consumer2.Start("Consumer2");
        producer.Join();
        consumer1.Join();
        consumer2.Join();
        Console.WriteLine("n=== ConcurrentDictionary 示例 ===");
        ConcurrentDictionaryExample();
    }
}

五、高级线程同步:Barrier和SpinLock

using System;
using System.Threading;
using System.Threading.Tasks;
public class AdvancedSynchronization
{
    // Barrier:同步多个线程的执行阶段
    private static Barrier barrier = new Barrier(3, (b) =>
    {
        Console.WriteLine($"n[Barrier] 阶段 {b.CurrentPhaseNumber} 完成,所有线程已同步n");
    });
    // SpinLock:自旋锁(适用于短时间的锁定)
    private static SpinLock spinLock = new SpinLock();
    private static int spinLockCounter = 0;
    // 使用Barrier的多阶段任务
    private static void BarrierTask(object state)
    {
        string taskName = (string)state;
        for (int phase = 0; phase 

六、完整示例程序

using System;
using System.Threading;
public class Program
{
    public static void Main(string[] args)
    {
        while (true)
        {
            Console.Clear();
            Console.WriteLine("===== .NET Framework 4.8 多线程编程示例 =====");
            Console.WriteLine("1. 基础线程操作");
            Console.WriteLine("2. 线程控制(暂停/继续/停止)");
            Console.WriteLine("3. 共享内存通信");
            Console.WriteLine("4. 事件通信机制");
            Console.WriteLine("5. 信号量和互斥量");
            Console.WriteLine("6. 线程同步(Lock/Monitor)");
            Console.WriteLine("7. 读写锁示例");
            Console.WriteLine("8. 线程安全集合");
            Console.WriteLine("9. 高级同步(Barrier/SpinLock)");
            Console.WriteLine("0. 退出");
            Console.Write("n请选择示例 (0-9): ");
            string choice = Console.ReadLine();
            Console.Clear();
            switch (choice)
            {
                case "1":
                    ThreadLifecycleExample.BasicThreadExample();
                    break;
                case "2":
                    ThreadControlExample.ThreadControlDemo();
                    break;
                case "3":
                    SharedMemoryCommunication.RunSharedMemoryExample();
                    break;
                case "4":
                    EventCommunication.RunEventExample();
                    break;
                case "5":
                    SemaphoreCommunication.RunSemaphoreExample();
                    break;
                case "6":
                    ThreadSynchronization.RunSynchronizationExample();
                    break;
                case "7":
                    ReaderWriterExample.RunReaderWriterExample();
                    break;
                case "8":
                    ThreadSafeCollections.RunThreadSafeCollectionExample();
                    break;
                case "9":
                    AdvancedSynchronization.RunAdvancedSynchronizationExample();
                    break;
                case "0":
                    return;
                default:
                    Console.WriteLine("无效选择");
                    break;
            }
            Console.WriteLine("n按任意键继续...");
            Console.ReadKey();
        }
    }
}

七、最佳实践与注意事项

1. 线程安全最佳实践

  • 优先使用高级同步原语:Task、async/await比直接使用Thread更安全
  • 避免死锁:始终以相同的顺序获取多个锁
  • 最小化锁的范围:只在必要的代码段使用锁
  • 使用不可变对象:减少同步需求
  • 优先使用线程安全集合:ConcurrentCollection系列

2. 性能优化建议

  • 使用线程池:避免频繁创建销毁线程
  • 合理设置线程数量:通常为CPU核心数的1-2倍
  • 避免过度同步:评估是否真的需要线程保护
  • 使用SpinLock处理短时间锁定:减少上下文切换
  • 使用ReaderWriterLock处理读多写少场景

3. 避免的常见错误

  • 不要使用Thread.Abort():可能导致资源泄漏
  • 避免使用Thread.Suspend/Resume:已过时且不安全
  • 不要忽略线程异常:使用try-catch保护线程代码
  • 避免在锁内调用未知代码:可能导致死锁
  • 不要在构造函数中启动线程:对象可能未完全初始化

4. 调试技巧

// 使用线程名称便于调试
Thread.CurrentThread.Name = "WorkerThread";
// 使用Trace输出线程信息
System.Diagnostics.Trace.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: Starting work");
// 使用条件断点调试特定线程
if (Thread.CurrentThread.Name == "SpecificThread")
{
    System.Diagnostics.Debugger.Break();
}

八、异步编程(Async/Await)

1. Task基础

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using System.IO;
public class TaskBasics
{
    // 创建和运行Task的不同方式
    public static void TaskCreationExamples()
    {
        Console.WriteLine("=== Task创建示例 ===");
        // 方式1:使用Task.Run
        Task task1 = Task.Run(() =>
        {
            Console.WriteLine($"Task1 运行在线程 {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(1000);
            Console.WriteLine("Task1 完成");
        });
        // 方式2:使用Task.Factory.StartNew(更多控制选项)
        Task task2 = Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"Task2 运行在线程 {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(500);
        }, CancellationToken.None, 
           TaskCreationOptions.LongRunning, 
           TaskScheduler.Default);
        // 方式3:使用new Task(需要手动启动)
        Task task3 = new Task(() =>
        {
            Console.WriteLine($"Task3 运行在线程 {Thread.CurrentThread.ManagedThreadId}");
        });
        task3.Start();
        // 方式4:返回结果的Task
        Task task4 = Task.Run(() =>
        {
            Console.WriteLine("Task4 计算中...");
            Thread.Sleep(1000);
            return 42;
        });
        // 等待所有任务完成
        Task.WaitAll(task1, task2, task3, task4);
        Console.WriteLine($"Task4 结果: {task4.Result}");
    }
    // Task的连续任务(ContinueWith)
    public static void TaskContinuationExample()
    {
        Console.WriteLine("n=== Task连续任务示例 ===");
        Task firstTask = Task.Run(() =>
        {
            Console.WriteLine("第一个任务执行中...");
            Thread.Sleep(1000);
            return 10;
        });
        // 单个连续任务
        Task continuationTask = firstTask.ContinueWith(antecedent =>
        {
            Console.WriteLine($"第一个任务结果: {antecedent.Result}");
            return antecedent.Result * 2;
        });
        // 多个连续任务链
        continuationTask
            .ContinueWith(t => 
            {
                Console.WriteLine($"第二个连续任务,结果: {t.Result}");
                return t.Result + 5;
            })
            .ContinueWith(t => 
            {
                Console.WriteLine($"最终结果: {t.Result}");
            });
        // 条件连续任务
        Task faultedTask = Task.Run(() => 
        {
            throw new Exception("任务失败");
        });
        faultedTask.ContinueWith(t =>
        {
            Console.WriteLine($"任务失败: {t.Exception?.GetBaseException().Message}");
        }, TaskContinuationOptions.OnlyOnFaulted);
        faultedTask.ContinueWith(t =>
        {
            Console.WriteLine("任务成功完成");
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
        Thread.Sleep(3000);
    }
    // Task异常处理
    public static void TaskExceptionHandling()
    {
        Console.WriteLine("n=== Task异常处理示例 ===");
        // 方式1:使用Wait或Result捕获异常
        Task taskWithError = Task.Run(() =>
        {
            throw new InvalidOperationException("任务中的异常");
        });
        try
        {
            taskWithError.Wait();
        }
        catch (AggregateException ae)
        {
            foreach (var ex in ae.InnerExceptions)
            {
                Console.WriteLine($"捕获异常: {ex.Message}");
            }
        }
        // 方式2:使用ContinueWith处理异常
        Task.Run(() =>
        {
            throw new Exception("另一个异常");
        }).ContinueWith(t =>
        {
            if (t.IsFaulted)
            {
                Console.WriteLine($"任务失败: {t.Exception?.Flatten().InnerException?.Message}");
            }
        });
        Thread.Sleep(1000);
    }
}

2. Async/Await模式

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Diagnostics;
public class AsyncAwaitPatterns
{
    // 基础async/await使用
    public static async Task BasicAsyncExample()
    {
        Console.WriteLine("=== 基础Async/Await示例 ===");
        Console.WriteLine($"开始方法,线程: {Thread.CurrentThread.ManagedThreadId}");
        // 异步延迟
        await Task.Delay(1000);
        Console.WriteLine($"延迟后,线程: {Thread.CurrentThread.ManagedThreadId}");
        // 异步运行CPU密集型任务
        int result = await Task.Run(() =>
        {
            Console.WriteLine($"Task.Run内部,线程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(500);
            return 42;
        });
        Console.WriteLine($"结果: {result}");
    }
    // 异步方法返回值
    public static async Task GetDataAsync(string input)
    {
        Console.WriteLine($"获取数据: {input}");
        await Task.Delay(1000);
        return $"处理结果: {input.ToUpper()}";
    }
    // 并发执行多个异步操作
    public static async Task ConcurrentAsyncOperations()
    {
        Console.WriteLine("n=== 并发异步操作示例 ===");
        Stopwatch sw = Stopwatch.StartNew();
        // 顺序执行(较慢)
        string result1 = await GetDataAsync("first");
        string result2 = await GetDataAsync("second");
        string result3 = await GetDataAsync("third");
        sw.Stop();
        Console.WriteLine($"顺序执行耗时: {sw.ElapsedMilliseconds}ms");
        sw.Restart();
        // 并发执行(较快)
        Task task1 = GetDataAsync("first");
        Task task2 = GetDataAsync("second");
        Task task3 = GetDataAsync("third");
        string[] results = await Task.WhenAll(task1, task2, task3);
        sw.Stop();
        Console.WriteLine($"并发执行耗时: {sw.ElapsedMilliseconds}ms");
        Console.WriteLine($"结果: {string.Join(", ", results)}");
    }
    // 异步流处理(使用IAsyncEnumerable需要.NET Core 3.0+,这里使用Task模拟)
    public static async Task> GetNumbersAsync()
    {
        List numbers = new List();
        for (int i = 0; i 

3. 高级异步模式

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Collections.Generic;
using System.Linq;
public class AdvancedAsyncPatterns
{
    // 使用TaskCompletionSource创建自定义异步操作
    public static Task CreateCustomAsyncOperation()
    {
        TaskCompletionSource tcs = new TaskCompletionSource();
        // 模拟异步操作
        Timer timer = null;
        timer = new Timer(_ =>
        {
            tcs.SetResult(42);
            timer?.Dispose();
        }, null, 1000, Timeout.Infinite);
        return tcs.Task;
    }
    // 异步信号量(SemaphoreSlim)
    private static SemaphoreSlim semaphore = new SemaphoreSlim(2, 2);
    public static async Task AsyncSemaphoreExample()
    {
        Console.WriteLine("=== 异步信号量示例 ===");
        List tasks = new List();
        for (int i = 0; i 
            {
                await semaphore.WaitAsync();
                try
                {
                    Console.WriteLine($"任务 {taskId} 进入临界区");
                    await Task.Delay(2000);
                    Console.WriteLine($"任务 {taskId} 离开临界区");
                }
                finally
                {
                    semaphore.Release();
                }
            }));
        }
        await Task.WhenAll(tasks);
    }
    // 异步锁(AsyncLock实现)
    public class AsyncLock
    {
        private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
        public async Task LockAsync()
        {
            await semaphore.WaitAsync();
            return new LockReleaser(semaphore);
        }
        private class LockReleaser : IDisposable
        {
            private readonly SemaphoreSlim semaphore;
            public LockReleaser(SemaphoreSlim semaphore)
            {
                this.semaphore = semaphore;
            }
            public void Dispose()
            {
                semaphore.Release();
            }
        }
    }
    // 使用异步锁
    private static AsyncLock asyncLock = new AsyncLock();
    private static int sharedResource = 0;
    public static async Task AsyncLockExample()
    {
        Console.WriteLine("n=== 异步锁示例 ===");
        List tasks = new List();
        for (int i = 0; i 
            {
                using (await asyncLock.LockAsync())
                {
                    int temp = sharedResource;
                    await Task.Delay(10);
                    sharedResource = temp + 1;
                    Console.WriteLine($"共享资源值: {sharedResource}");
                }
            }));
        }
        await Task.WhenAll(tasks);
        Console.WriteLine($"最终值: {sharedResource}");
    }
    // 异步生产者-消费者模式
    public static async Task ProducerConsumerAsync()
    {
        Console.WriteLine("n=== 异步生产者-消费者模式 ===");
        // 使用Channel作为队列(.NET Core 3.0+)
        // 这里使用BlockingCollection模拟
        var queue = new System.Collections.Concurrent.BlockingCollection(10);
        // 生产者
        Task producer = Task.Run(async () =>
        {
            for (int i = 0; i 
        {
            while (!queue.IsCompleted)
            {
                if (queue.TryTake(out int item, 100))
                {
                    Console.WriteLine($"消费: {item}");
                    await Task.Delay(200);
                }
            }
        });
        await Task.WhenAll(producer, consumer);
    }
    // 批量异步操作with限流
    public static async Task ThrottledAsyncOperations()
    {
        Console.WriteLine("n=== 限流异步操作示例 ===");
        // 要处理的数据
        List data = Enumerable.Range(1, 20).ToList();
        // 最大并发数
        int maxConcurrency = 3;
        SemaphoreSlim throttler = new SemaphoreSlim(maxConcurrency);
        List tasks = new List();
        foreach (var item in data)
        {
            tasks.Add(Task.Run(async () =>
            {
                await throttler.WaitAsync();
                try
                {
                    Console.WriteLine($"开始处理: {item}");
                    await ProcessItemAsync(item);
                    Console.WriteLine($"完成处理: {item}");
                }
                finally
                {
                    throttler.Release();
                }
            }));
        }
        await Task.WhenAll(tasks);
    }
    private static async Task ProcessItemAsync(int item)
    {
        await Task.Delay(1000);
    }
}

4. 并行编程(Parallel类和PLINQ)

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using System.Collections.Concurrent;
using System.Diagnostics;
public class ParallelProgramming
{
    // Parallel.For和Parallel.ForEach
    public static void ParallelLoops()
    {
        Console.WriteLine("=== Parallel循环示例 ===");
        // Parallel.For
        Console.WriteLine("nParallel.For示例:");
        Parallel.For(0, 10, i =>
        {
            Console.WriteLine($"处理索引 {i}, 线程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
        // Parallel.ForEach
        Console.WriteLine("nParallel.ForEach示例:");
        List items = new List { "A", "B", "C", "D", "E", "F", "G", "H" };
        Parallel.ForEach(items, item =>
        {
            Console.WriteLine($"处理项目 {item}, 线程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(200);
        });
        // 带有ParallelOptions的控制
        Console.WriteLine("n带选项的Parallel.ForEach:");
        ParallelOptions options = new ParallelOptions
        {
            MaxDegreeOfParallelism = 2,
            CancellationToken = CancellationToken.None
        };
        Parallel.ForEach(items, options, item =>
        {
            Console.WriteLine($"限制并发处理 {item}, 线程: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(500);
        });
    }
    // Parallel.Invoke
    public static void ParallelInvoke()
    {
        Console.WriteLine("n=== Parallel.Invoke示例 ===");
        Parallel.Invoke(
            () => DoWork("任务1", 1000),
            () => DoWork("任务2", 1500),
            () => DoWork("任务3", 800),
            () => DoWork("任务4", 1200)
        );
        Console.WriteLine("所有并行任务完成");
    }
    private static void DoWork(string taskName, int delay)
    {
        Console.WriteLine($"{taskName} 开始, 线程: {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(delay);
        Console.WriteLine($"{taskName} 完成");
    }
    // PLINQ(并行LINQ)
    public static void PLINQExamples()
    {
        Console.WriteLine("n=== PLINQ示例 ===");
        // 生成测试数据
        List numbers = Enumerable.Range(1, 100).ToList();
        // 基本PLINQ查询
        var parallelQuery = numbers
            .AsParallel()
            .Where(n => n % 2 == 0)
            .Select(n => n * n)
            .ToList();
        Console.WriteLine($"偶数平方结果数量: {parallelQuery.Count}");
        // 控制并行度
        var controlledQuery = numbers
            .AsParallel()
            .WithDegreeOfParallelism(2)
            .Where(n => IsPrime(n))
            .ToList();
        Console.WriteLine($"质数数量: {controlledQuery.Count}");
        // 保持顺序
        var orderedQuery = numbers
            .AsParallel()
            .AsOrdered()
            .Where(n => n > 50)
            .Select(n => n * 2)
            .Take(10)
            .ToList();
        Console.WriteLine($"有序结果: {string.Join(", ", orderedQuery)}");
        // 聚合操作
        int sum = numbers
            .AsParallel()
            .Where(n => n % 3 == 0)
            .Sum();
        Console.WriteLine($"能被3整除的数之和: {sum}");
        // 性能比较
        Stopwatch sw = Stopwatch.StartNew();
        // 顺序执行
        var sequentialResult = numbers
            .Where(n => ExpensiveOperation(n))
            .ToList();
        sw.Stop();
        Console.WriteLine($"顺序执行时间: {sw.ElapsedMilliseconds}ms");
        sw.Restart();
        // 并行执行
        var parallelResult = numbers
            .AsParallel()
            .Where(n => ExpensiveOperation(n))
            .ToList();
        sw.Stop();
        Console.WriteLine($"并行执行时间: {sw.ElapsedMilliseconds}ms");
    }
    private static bool IsPrime(int number)
    {
        if (number  0.0, // 局部初始值
            (number, loop, localTotal) => // 局部计算
            {
                return localTotal + Math.Sqrt(number);
            },
            localTotal => // 合并局部结果
            {
                lock (lockObj)
                {
                    total += localTotal;
                }
            }
        );
        Console.WriteLine($"并行聚合结果: {total:F2}");
        // 使用PLINQ聚合
        double plinqTotal = numbers
            .AsParallel()
            .Select(n => Math.Sqrt(n))
            .Sum();
        Console.WriteLine($"PLINQ聚合结果: {plinqTotal:F2}");
    }
}

5. 数据流(Dataflow)和管道模式

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Collections.Generic;
public class DataflowPipeline
{
    // 简单的数据流管道
    public static async Task SimpleDataflowExample()
    {
        Console.WriteLine("=== 数据流管道示例 ===");
        // 创建数据流块
        var transformBlock = new TransformBlock(
            async n =>
            {
                await Task.Delay(100);
                Console.WriteLine($"转换: {n} -> {n * n}");
                return $"Result: {n * n}";
            },
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 2
            }
        );
        var actionBlock = new ActionBlock(
            async s =>
            {
                await Task.Delay(50);
                Console.WriteLine($"处理: {s}");
            }
        );
        // 链接块
        transformBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
        // 发送数据
        for (int i = 1; i (n => n);
        // 批处理块
        var batchBlock = new BatchBlock(3);
        // 转换块
        var multiplyBlock = new TransformBlock(n => n * 2);
        var addBlock = new TransformBlock(n => n + 100);
        // 合并块
        var joinBlock = new JoinBlock();
        // 动作块
        var finalBlock = new ActionBlock>(
            tuple => Console.WriteLine($"结果: {tuple.Item1}, {tuple.Item2}")
        );
        var batchPrintBlock = new ActionBlock(
            batch => Console.WriteLine($"批次: [{string.Join(", ", batch)}]")
        );
        // 构建管道
        broadcastBlock.LinkTo(multiplyBlock);
        broadcastBlock.LinkTo(addBlock);
        broadcastBlock.LinkTo(batchBlock);
        multiplyBlock.LinkTo(joinBlock.Target1);
        addBlock.LinkTo(joinBlock.Target2);
        joinBlock.LinkTo(finalBlock);
        batchBlock.LinkTo(batchPrintBlock);
        // 发送数据
        for (int i = 1; i 

6. 异步编程最佳实践

using System;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using System.Net.Http;
public class AsyncBestPractices
{
    // 避免async void(除了事件处理器)
    // 错误示例
    public async void BadAsyncMethod()
    {
        await Task.Delay(1000);
        throw new Exception("无法捕获的异常");
    }
    // 正确示例
    public async Task GoodAsyncMethod()
    {
        await Task.Delay(1000);
        throw new Exception("可以捕获的异常");
    }
    // 配置await
    public async Task ConfigureAwaitExample()
    {
        // 不需要捕获上下文时使用ConfigureAwait(false)
        await Task.Delay(1000).ConfigureAwait(false);
        // 这在库代码中特别重要,可以提高性能
        await SomeLibraryMethodAsync().ConfigureAwait(false);
    }
    private Task SomeLibraryMethodAsync() => Task.CompletedTask;
    // 避免阻塞异步代码
    public class AvoidBlockingExample
    {
        // 错误:可能导致死锁
        public void BadExample()
        {
            var result = GetDataAsync().Result; // 避免这样做
            GetDataAsync().Wait(); // 也要避免这样
        }
        // 正确:使用async/await
        public async Task GoodExample()
        {
            var result = await GetDataAsync();
        }
        private async Task GetDataAsync()
        {
            await Task.Delay(100);
            return "data";
        }
    }
    // 使用ValueTask优化性能
    public async ValueTask GetCachedValueAsync()
    {
        // 如果有缓存值,直接返回(没有分配)
        if (_cache.ContainsKey("key"))
            return _cache["key"];
        // 否则异步获取
        var value = await ComputeValueAsync();
        _cache["key"] = value;
        return value;
    }
    private Dictionary _cache = new Dictionary();
    private async Task ComputeValueAsync()
    {
        await Task.Delay(1000);
        return 42;
    }
    // 正确处理多个异步操作
    public async Task HandleMultipleOperationsCorrectly()
    {
        // 错误:顺序等待(慢)
        var result1 = await Operation1Async();
        var result2 = await Operation2Async();
        var result3 = await Operation3Async();
        // 正确:并发执行(快)
        var task1 = Operation1Async();
        var task2 = Operation2Async();
        var task3 = Operation3Async();
        await Task.WhenAll(task1, task2, task3);
        var results = new[] { task1.Result, task2.Result, task3.Result };
    }
    private Task Operation1Async() => Task.FromResult(1);
    private Task Operation2Async() => Task.FromResult(2);
    private Task Operation3Async() => Task.FromResult(3);
    // 使用IAsyncDisposable(.NET Standard 2.1+)
    public class AsyncDisposableExample : IDisposable
    {
        private readonly HttpClient httpClient = new HttpClient();
        private readonly FileStream fileStream;
        public AsyncDisposableExample(string filePath)
        {
            fileStream = new FileStream(filePath, FileMode.Create);
        }
        public async Task WriteAsync(byte[] data)
        {
            await fileStream.WriteAsync(data, 0, data.Length);
        }
        // 异步清理资源
        public async ValueTask DisposeAsync()
        {
            if (fileStream != null)
            {
                await fileStream.FlushAsync();
                fileStream.Dispose();
            }
            httpClient?.Dispose();
        }
        public void Dispose()
        {
            fileStream?.Dispose();
            httpClient?.Dispose();
        }
    }
}

7. 完整的异步编程示例程序

public class AsyncProgrammingDemo
{
    public static async Task Main(string[] args)
    {
        while (true)
        {
            Console.Clear();
            Console.WriteLine("===== .NET异步编程示例 =====");
            Console.WriteLine("1. Task基础操作");
            Console.WriteLine("2. Task连续和异常处理");
            Console.WriteLine("3. 基础Async/Await");
            Console.WriteLine("4. 并发异步操作");
            Console.WriteLine("5. 异步取消操作");
            Console.WriteLine("6. 高级异步模式");
            Console.WriteLine("7. Parallel循环");
            Console.WriteLine("8. PLINQ示例");
            Console.WriteLine("9. 数据流管道");
            Console.WriteLine("0. 返回主菜单");
            Console.Write("n请选择 (0-9): ");
            string choice = Console.ReadLine();
            Console.Clear();
            try
            {
                switch (choice)
                {
                    case "1":
                        TaskBasics.TaskCreationExamples();
                        break;
                    case "2":
                        TaskBasics.TaskContinuationExample();
                        TaskBasics.TaskExceptionHandling();
                        break;
                    case "3":
                        await AsyncAwaitPatterns.BasicAsyncExample();
                        break;
                    case "4":
                        await AsyncAwaitPatterns.ConcurrentAsyncOperations();
                        break;
                    case "5":
                        await AsyncAwaitPatterns.CancellationExample();
                        break;
                    case "6":
                        await AdvancedAsyncPatterns.AsyncSemaphoreExample();
                        await AdvancedAsyncPatterns.AsyncLockExample();
                        break;
                    case "7":
                        ParallelProgramming.ParallelLoops();
                        ParallelProgramming.ParallelInvoke();
                        break;
                    case "8":
                        ParallelProgramming.PLINQExamples();
                        break;
                    case "9":
                        await DataflowPipeline.SimpleDataflowExample();
                        await DataflowPipeline.ComplexPipelineExample();
                        break;
                    case "0":
                        return;
                    default:
                        Console.WriteLine("无效选择");
                        break;
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"发生错误: {ex.Message}");
            }
            Console.WriteLine("n按任意键继续...");
            Console.ReadKey();
        }
    }
}

总结

.NET Framework 4.8提供了丰富的多线程和异步编程工具:

传统多线程编程

  1. 基础控制:Thread类提供了线程的创建、启动、暂停、继续和停止
  2. 线程间通信:通过共享内存、事件、信号量等多种机制实现
  3. 线程保护:Lock、Monitor、Mutex等确保线程安全
  4. 高级特性:Barrier、SpinLock、线程安全集合等提供更精细的控制

现代异步编程

  1. Task和Task:更高级的线程抽象,支持连续任务和异常处理
  2. Async/Await:简化异步编程,使代码更易读写和维护
  3. 并行编程:Parallel类和PLINQ提供数据并行和任务并行
  4. 数据流:TPL Dataflow提供构建复杂数据处理管道的能力
  5. 异步同步原语:SemaphoreSlim、异步锁等支持异步场景

选择建议

  • 优先使用Task和async/await:更现代、更安全、性能更好
  • 使用Thread类的场景:需要精细控制线程属性或与旧代码集成
  • 使用Parallel和PLINQ:处理数据并行和CPU密集型任务
  • 使用Dataflow:构建复杂的数据处理管道

选择合适的技术对于构建高效、可靠的多线程应用程序至关重要。建议从async/await开始,逐步掌握各种并发编程模式。

到此这篇关于.NET Framework 4.8 多线程编程的文章就介绍到这了,更多相关.NET Framework多线程编程内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!

本文收集自网络,不代表IT俱乐部立场,转载请注明出处。https://www.2it.club/code/asp-net/17151.html
上一篇
下一篇
联系我们

联系我们

在线咨询: QQ交谈

邮箱: 1120393934@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部