2.4 非排他锁 Semaphore :并发线程数限制

导读

在本章中,将学 Semaphore 和 SemaphoreSlim,两者都可以限制同时访问某一资源或资源池的线程数,实现并发时限制具体数量的线程进行并发操作。与 lock 不同的时,Semaphore 运行多个线程同时执行相同的区域代码,因此称为非排他锁。

在本章中,我们从案例入手,通过示例代码,慢慢深入了解。

Semaphore 类

这里,先列出 Semaphore 类常用的 API。

其构造函数如下:

构造函数 说明
Semaphore(Int32, Int32) 初始化 Semaphore 类的新实例,并指定初始入口数和最大并发入口数。
Semaphore(Int32, Int32, String) 初始化 Semaphore 类的新实例,并指定初始入口数和最大并发入口数,根据需要指定系统信号灯对象的名称。
Semaphore(Int32, Int32, String, Boolean) 初始化 Semaphore 类的新实例,并指定初始入口数和最大并发入口数,还可以选择指定系统信号量对象的名称,以及指定一个变量来接收指示是否创建了新系统信号量的值。

Semaphore 使用纯粹的内核时间(kernel-time)方式(等待时间很短),并且支持在不同的进程间同步线程(像Mutex)。

Semaphore 常用方法如下:

方法 说明
Close() 释放由当前 WaitHandle占用的所有资源。
OpenExisting(String) 打开指定名称为信号量(如果已经存在)。
Release() 退出信号量并返回前一个计数。
Release(Int32) 以指定的次数退出信号量并返回前一个计数。
TryOpenExisting(String, Semaphore) 打开指定名称为信号量(如果已经存在),并返回指示操作是否成功的值。
WaitOne() 阻止当前线程,直到当前 WaitHandle 收到信号。
WaitOne(Int32) 阻止当前线程,直到当前 WaitHandle 收到信号,同时使用 32 位带符号整数指定时间间隔(以毫秒为单位)。
WaitOne(Int32, Boolean) 阻止当前线程,直到当前的 WaitHandle 收到信号为止,同时使用 32 位带符号整数指定时间间隔,并指定是否在等待之前退出同步域。
WaitOne(TimeSpan) 阻止当前线程,直到当前实例收到信号,同时使用 TimeSpan 指定时间间隔。
WaitOne(TimeSpan, Boolean) 阻止当前线程,直到当前实例收到信号为止,同时使用 TimeSpan 指定时间间隔,并指定是否在等待之前退出同步域。

示例

我们来直接写代码,这里使用 《原子操作 Interlocked》 中的示例,现在我们要求,采用多个线程执行计算,但是只允许最多三个线程同时执行运行。

使用 Semaphore ,有四个步骤:

  1. new 实例化 Semaphore,并设置最大线程数、初始化时可进入线程数;

  2. 使用 .WaitOne(); 获取进入权限(在获得进入权限前,线程处于阻塞状态)。

  3. 离开时使用 Release() 释放占用。

  4. Close() 释放Semaphore 对象。

《原子操作 Interlocked》 中的示例改进如下:

    class Program
    {
        // 求和
        private static int sum = 0;
        private static Semaphore _pool;

        // 判断十个线程是否结束了。
        private static int isComplete = 0;
        // 第一个程序
        static void Main(string[] args)
        {
            Console.WriteLine("执行程序");

            // 设置允许最大三个线程进入资源池
            // 一开始设置为0,就是初始化时允许几个线程进入
            // 这里设置为0,后面按下按键时,可以放通三个线程
            _pool = new Semaphore(0, 3);
            for (int i = 0; i < 10; i++)
            {
                Thread thread = new Thread(new ParameterizedThreadStart(AddOne));
                thread.Start(i + 1);
            }
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("任意按下键(不要按关机键),可以打开资源池");
            Console.ForegroundColor = ConsoleColor.White;
            Console.ReadKey();

            // 准许三个线程进入
            _pool.Release(3);

            // 这里没有任何意义,就单纯为了演示查看结果。
            // 等待所有线程完成任务
            while (true)
            {
                if (isComplete >= 10)
                    break;
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }
            Console.WriteLine("sum = " + sum);

            // 释放池
            _pool.Close();

        }

        public static void AddOne(object n)
        {
            Console.WriteLine($"    线程{(int)n}启动,进入队列");
            // 进入队列等待
            _pool.WaitOne();
            Console.WriteLine($"第{(int)n}个线程进入资源池");
            // 进入资源池
            for (int i = 0; i < 10; i++)
            {
                Interlocked.Add(ref sum, 1);
                Thread.Sleep(TimeSpan.FromMilliseconds(500));
            }
            // 解除占用的资源池
            _pool.Release();
            isComplete += 1;
            Console.WriteLine($"                     第{(int)n}个线程退出资源池");
        }
    }

Semaphoregif

示例说明

实例化 Semaphore 使用了new Semaphore(0,3); ,其构造函数原型为

public Semaphore(int initialCount, int maximumCount);

initialCount 表示一开始允许几个进程进入资源池,如果设置为0,所有线程都不能进入,要一直等资源池放通。

maximumCount 表示最大允许几个线程进入资源池。

Release() 表示退出信号量并返回前一个计数。这个计数指的是资源池还可以进入多少个线程。

可以看一下下面的示例:

        private static Semaphore _pool;
        static void Main(string[] args)
        {
            _pool = new Semaphore(0, 5);
            _pool.Release(5);
            new Thread(AddOne).Start();
            Thread.Sleep(TimeSpan.FromSeconds(10));
            _pool.Close();
        }

        public static void AddOne()
        {
            _pool.WaitOne();
            Thread.Sleep(1000);
            int count = _pool.Release();
            Console.WriteLine("在此线程退出资源池前,资源池还有多少线程可以进入?" + count);
        }

信号量

前面我们学习到 Mutex,这个类是全局操作系统起作用的。我们从 Mutex 和 Semphore 中,也看到了 信号量这个东西,用于进程同步。

信号量分为两种类型:本地信号量和命名系统信号量。

  • 命名系统信号量在整个操作系统中均可见,可用于同步进程的活动。

  • 局部信号量仅存在于进程内。

当 name 为 null 或者为空时,Mutex 的信号量时局部信号量,否则 Mutex 的信号量是命名系统信号量。Semaphore 的话,也是两种方式都有。

如果使用接受名称的构造函数创建 Semaphor 对象,则该对象将与该名称的操作系统信号量关联。

两个构造函数:

Semaphore(Int32, Int32, String)
Semaphore(Int32, Int32, String, Boolean)

上面的构造函数可以创建多个表示同一命名系统信号量的 Semaphore 对象,并可以使用 OpenExisting 方法打开现有的已命名系统信号量。

我们上面使用的示例就是局部信号量,进程中引用本地 Semaphore 对象的所有线程都可以使用。 每个 Semaphore 对象都是单独的本地信号量。

可以利用 Semaphore 限制一个程序最多能够同时运行多少个。

SemaphoreSlim类

SemaphoreSlim 跟 Semaphore 有啥关系?

我看一下书再回答你。

file

哦哦哦,微软文档说:

SemaphoreSlim 表示对可同时访问资源或资源池的线程数加以限制的 Semaphore 的轻量替代。

SemaphoreSlim 不使用信号量,不支持进程间同步,只能在进程内使用。

它有两个构造函数:

构造函数 说明
SemaphoreSlim(Int32) 初始化 SemaphoreSlim 类的新实例,以指定可同时授予的请求的初始数量。
SemaphoreSlim(Int32, Int32) 初始化 SemaphoreSlim 类的新实例,同时指定可同时授予的请求的初始数量和最大数量。

示例

我们改造一下前面 Semaphore 中的示例:

    class Program
    {
        // 求和
        private static int sum = 0;
        private static SemaphoreSlim _pool;

        // 判断十个线程是否结束了。
        private static int isComplete = 0;
        static void Main(string[] args)
        {
            Console.WriteLine("执行程序");

            // 设置允许最大三个线程进入资源池
            // 一开始设置为0,就是初始化时允许几个线程进入
            // 这里设置为0,后面按下按键时,可以放通三个线程
            _pool = new SemaphoreSlim(0, 3);
            for (int i = 0; i < 10; i++)
            {
                Thread thread = new Thread(new ParameterizedThreadStart(AddOne));
                thread.Start(i + 1);
            }

            Console.WriteLine("任意按下键(不要按关机键),可以打开资源池");
            Console.ReadKey();
            // 
            _pool.Release(3);

            // 这里没有任何意义,就单纯为了演示查看结果。
            // 等待所有线程完成任务
            while (true)
            {
                if (isComplete >= 10)
                    break;
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }
            Console.WriteLine("sum = " + sum);
            // 释放池
        }

        public static void AddOne(object n)
        {
            Console.WriteLine($"    线程{(int)n}启动,进入队列");
            // 进入队列等待
            _pool.Wait();
            Console.WriteLine($"第{(int)n}个线程进入资源池");
            // 进入资源池
            for (int i = 0; i < 10; i++)
            {
                Interlocked.Add(ref sum, 1);
                Thread.Sleep(TimeSpan.FromMilliseconds(200));
            }
            // 解除占用的资源池
            _pool.Release();
            isComplete += 1;
            Console.WriteLine($"                     第{(int)n}个线程退出资源池");
        }
    }

SemaphoreSlim 不需要 Close()

两者在代码上的区别是就这么简单。

区别

如果使用下面的构造函数实例化 Semaphor(参数name不能为空),那么创建的对象在整个操作系统内都有效

public Semaphore (int initialCount, int maximumCount, string name);

Semaphorslim 则只在进程内有效,SemaphoreSlim 是对 Semaphore 的简单封装。

SemaphoreSlim 类不会对 WaitWaitAsyncRelease 方法的调用强制执行线程或任务标识。

而 Semaphor 类,会对此进行严格监控,如果对应调用数量不一致,会出现异常。

此外,如果使用 SemaphoreSlim(Int32 maximumCount) 构造函数来实例化 SemaphoreSlim 对象,获取其 CurrentCount 属性,其值可能会大于 maximumCount。 编程人员应负责确保调用一个 Wait 或 WaitAsync 方法,便调用一个 Release。

这就好像笔筒里面的笔,没有监控,使用这使用完毕后,都应该将笔放进去。如果原先有10支笔,每次使用不放进去,或者将别的地方的笔放进去,那么最后数量就不是10了。因此使用时需要注意捕获异常,合理释放锁。

file

Copyright © 痴者工良 2022 all right reserved,powered by Gitbook文档最后更新时间: 2023-05-29 06:30:53

results matching ""

    No results matching ""