9.2 运用API实现线程同步

Windows 线程同步是指多个线程一同访问共享资源时,为了避免资源的并发访问导致数据的不一致或程序崩溃等问题,需要对线程的访问进行协同和控制,以保证程序的正确性和稳定性。Windows提供了多种线程同步机制,以适应不同的并发编程场景。主要包括以下几种:

  • 事件(Event):用于不同线程间的信号通知。包括单次通知事件和重复通知事件两种类型。
  • 互斥量(Mutex):用于控制对共享资源的访问,具有独占性,可避免线程之间对共享资源的非法访问。
  • 临界区(CriticalSection):和互斥量类似,也用于控制对共享资源的访问,但是是进程内部的,因此比较适用于同一进程中的线程同步控制。
  • 信号量(Semaphore):用于基于计数器机制,控制并发资源的访问数量。
  • 互锁变量(Interlocked Variable):用于对变量的并发修改操作控制,可提供一定程度的原子性操作保证。

以上同步机制各有优缺点和适用场景,开发者应根据具体应用场景进行选择和使用。在线程同步的实现过程中,需要注意竞争条件和死锁的处理,以确保程序中的线程能协同工作,共享资源能够正确访问和修改。线程同步是并发编程中的重要基础,对于开发高效、稳定的并发应用至关重要。

9.2.1 CreateEvent

CreateEvent 是Windows API提供的用于创建事件对象的函数之一,该函数用于创建一个事件对象,并返回一个表示该事件对象的句柄。可以通过SetEvent函数将该事件对象设置为有信号状态,通过ResetEevent函数将该事件对象设置为无信号状态。当使用WaitForSingleObject或者WaitForMultipleObjects函数等待事件对象时,会阻塞线程直到事件状态被置位。对于手动重置事件,需要调用ResetEvent函数手动将事件状态置位。

CreateEvent 函数常用于线程同步和进程间通信,在不同线程或者进程之间通知事件状态的改变。例如,某个线程完成了一项任务,需要通知其它等待该任务完成的线程;或者某个进程需要和另一个进程进行协调,需要通知其它进程某个事件的发生等等。

CreateEvent 函数的函数原型如下:

HANDLE CreateEvent(
  LPSECURITY_ATTRIBUTES lpEventAttributes,
  BOOL                  bManualReset,
  BOOL                  bInitialState,
  LPCTSTR               lpName
);

参数说明:

  • lpEventAttributes:指向SECURITY_ATTRIBUTES结构体的指针,指定事件对象的安全描述符和访问权限。通常设为NULL,表示使用默认值。
  • bManualReset:指定事件对象的类型,TRUE表示创建的是手动重置事件,FALSE表示创建的是自动重置事件。
  • bInitialState:指定事件对象的初始状态,TRUE表示将事件对象设为有信号状态,FALSE表示将事件对象设为无信号状态。
  • lpName:指定事件对象的名称,可以为NULL。

CreateEvent 是实现线程同步和进程通信的重要手段之一,应用广泛且易用。在第一章中我们创建的多线程环境可能会出现线程同步的问题,此时使用Event事件机制即可很好的解决,首先在初始化时通过CreateEvent将事件设置为False状态,进入ThreadFunction线程时再次通过SetEvent释放,以此即可实现线程同步顺序执行的目的。

#include <stdio.h>
#include <process.h>
#include <windows.h>

// 全局资源
long g_nNum = 0;

// 子线程个数
const int THREAD_NUM = 10;

CRITICAL_SECTION  g_csThreadCode;
HANDLE g_hThreadEvent;

unsigned int __stdcall ThreadFunction(void *ptr)
{
  int nThreadNum = *(int *)ptr;

  // 线程函数中触发事件
  SetEvent(g_hThreadEvent);

  // 进入线程锁
  EnterCriticalSection(&g_csThreadCode);
  g_nNum++;
  printf("线程编号: %d --> 全局资源值: %d --> 子线程ID: %d \n", nThreadNum, g_nNum, GetCurrentThreadId());

  // 离开线程锁
  LeaveCriticalSection(&g_csThreadCode);
  return 0;
}

int main(int argc,char * argv[])
{
  unsigned int ThreadCount = 0;
  HANDLE  handle[THREAD_NUM];

  // 初始化自动将事件设置为False
  g_hThreadEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  InitializeCriticalSection(&g_csThreadCode);

  for (int each = 0; each < THREAD_NUM; each++)
  {
    handle[each] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunction, &each, 0, &ThreadCount);

    // 等待线程事件被触发
    WaitForSingleObject(g_hThreadEvent, INFINITE);
  }

  WaitForMultipleObjects(THREAD_NUM, handle, TRUE, INFINITE);

  // 销毁事件
  CloseHandle(g_hThreadEvent);
  DeleteCriticalSection(&g_csThreadCode);

  system("pause");
  return 0;
}

当然了事件对象同样可以实现更为复杂的同步机制,在如下我们在创建对象时,可以设置non-signaled状态运行的auto-reset模式,当我们设置好我们需要的参数时,可以直接使用SetEvent(hEvent)设置事件状态,则会自动执行线程函数。

要创建一个manual-reset模式并且初始状态为not-signaled的事件对象,需要按照以下步骤:

首先定义一个SECURITY_ATTRIBUTES结构体变量,设置其中的参数为NULL表示使用默认安全描述符,例如。

SECURITY_ATTRIBUTES sa = {0};
sa.nLength = sizeof(sa);
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = FALSE;

接着调用CreateEvent函数创建事件对象,将bManualResetbInitialState参数设置为FALSE,表示创建manual-reset模式的事件对象并初始状态为not-signaled。例如:

HANDLE hEvent = CreateEvent(
                      &sa,           // 安全属性
                      TRUE,          // Manual-reset模式
                      FALSE,         // Not-signaled 初始状态
                      NULL           // 事件对象名称
                      );

这样,我们就创建了一个名为hEventmanual-reset模式的事件对象,初始状态为not-signaled。可以通过SetEvent函数将事件对象设置为signaled状态,通过ResetEvent函数将事件对象设置为non-signaled状态,也可以通过WaitForSingleObject或者WaitForMultipleObjects函数等待事件对象的状态变化。

#include <windows.h>  
#include <stdio.h>  
#include <process.h>  
#define STR_LEN 100  

// 存储全局字符串
static char str[STR_LEN];

// 设置事件句柄
static HANDLE hEvent;

// 统计字符串中是否存在A
unsigned WINAPI NumberOfA(void *arg)
{
  int cnt = 0;
  // 等待线程对象事件
  WaitForSingleObject(hEvent, INFINITE);
  for (int i = 0; str[i] != 0; i++)
  {
    if (str[i] == 'A')
      cnt++;
  }
  printf("Num of A: %d \n", cnt);
  return 0;
}

// 统计字符串总长度
unsigned WINAPI NumberOfOthers(void *arg)
{
  int cnt = 0;
  // 等待线程对象事件
  WaitForSingleObject(hEvent, INFINITE);
  for (int i = 0; str[i] != 0; i++)
  {
    if (str[i] != 'A')
      cnt++;
  }
  printf("Num of others: %d \n", cnt - 1);
  return 0;
}

int main(int argc, char *argv[])
{
  HANDLE hThread1, hThread2;

  // 以non-signaled创建manual-reset模式的事件对象
  // 该对象创建后不会被立即执行,只有我们设置状态为Signaled时才会继续
  hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

  hThread1 = (HANDLE)_beginthreadex(NULL, 0, NumberOfA, NULL, 0, NULL);
  hThread2 = (HANDLE)_beginthreadex(NULL, 0, NumberOfOthers, NULL, 0, NULL);

  fputs("Input string: ", stdout);
  fgets(str, STR_LEN, stdin);

  // 字符串读入完毕后,将事件句柄改为signaled状态  
  SetEvent(hEvent);

  WaitForSingleObject(hThread1, INFINITE);
  WaitForSingleObject(hThread2, INFINITE);

  // non-signaled 如果不更改,对象继续停留在signaled
  ResetEvent(hEvent);

  CloseHandle(hEvent);

  system("pause");
  return 0;
}

9.2.2 CreateSemaphore

CreateSemaphore 是Windows API提供的用于创建信号量的函数之一,用于控制多个线程之间对共享资源的访问数量。该函数常用于创建一个计数信号量对象,并返回一个表示该信号量对象的句柄。可以通过ReleaseSemaphore函数将该信号量对象的计数加1,通过WaitForSingleObject或者WaitForMultipleObjects函数等待信号量对象的计数变成正数以后再将其减1,以实现对共享资源访问数量的控制。

CreateSemaphore 函数常用于实现生产者消费者模型、线程池、任务队列等并发编程场景,用于限制访问共享资源的线程数量。信号量机制更多时候被用于限制资源的数量而不是限制线程的数量,但也可以用来实现一些线程同步场景。

该函数的函数原型如下:

HANDLE CreateSemaphore(
  LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,
  LONG                  lInitialCount,
  LONG                  lMaximumCount,
  LPCTSTR               lpName
);

参数说明:

  • lpSemaphoreAttributes:指向SECURITY_ATTRIBUTES结构体的指针,指定信号量对象的安全描述符和访问权限。通常设为NULL,表示使用默认值。
  • lInitialCount:指定信号量对象的初始计数,表示可以同时访问共享资源的线程数量。
  • lMaximumCount:指定信号量对象的最大计数,表示信号量对象的计数上限。
  • lpName:指定信号量对象的名称,可以为NULL。

总的来说,CreateSemaphore 是实现线程同步和进程通信,控制对共享资源的访问数量的重要手段之一,如下一段演示代码片段则通过此方法解决了线程通过问题,首先调用CreateSemaphore初始化时将信号量设置一个最大值,每次进入线程函数内部时,则ReleaseSemaphore信号自动加1,如果大于指定的数值则WaitForSingleObject等待释放信号.

#include <stdio.h>
#include <process.h>
#include <windows.h>

// 全局资源
long g_nNum = 0;

// 子线程个数
const int THREAD_NUM = 10;

CRITICAL_SECTION  g_csThreadCode;
HANDLE g_hThreadParameter;

unsigned int __stdcall ThreadFunction(void *ptr)
{
  int nThreadNum = *(int *)ptr;

  // 信号量++
  ReleaseSemaphore(g_hThreadParameter, 1, NULL);

  // 进入线程锁
  EnterCriticalSection(&g_csThreadCode);
  g_nNum++;
  printf("线程编号: %d --> 全局资源值: %d --> 子线程ID: %d \n", nThreadNum, g_nNum, GetCurrentThreadId());

  // 离开线程锁
  LeaveCriticalSection(&g_csThreadCode);
  return 0;
}

int main(int argc,char * argv[])
{
  unsigned int ThreadCount = 0;
  HANDLE  handle[THREAD_NUM];

  // 初始化信号量当前0个资源,最大允许1个同时访问
  g_hThreadParameter = CreateSemaphore(NULL, 0, 1, NULL);
  InitializeCriticalSection(&g_csThreadCode);

  for (int each = 0; each < THREAD_NUM; each++)
  {
    handle[each] = (HANDLE)_beginthreadex(NULL, 0, ThreadFunction, &each, 0, &ThreadCount);

    // 等待信号量>0
    WaitForSingleObject(g_hThreadParameter, INFINITE);
  }

  // 关闭信号
  CloseHandle(g_hThreadParameter);

  // 等待所有进程结束
  WaitForMultipleObjects(THREAD_NUM, handle, TRUE, INFINITE);
  DeleteCriticalSection(&g_csThreadCode);

  system("pause");
  return 0;
}

如下所示代码片段,是一个应用了两个线程的案例,初始化信号为0,利用信号量值为0时进入non-signaled状态,大于0时进入signaled状态的特性即可实现线程同步。

执行WaitForSingleObject(semTwo, INFINITE);会让线程函数进入类似挂起的状态,当接到ReleaseSemaphore(semOne, 1, NULL);才会恢复执行。

#include <windows.h>  
#include <stdio.h>  

static HANDLE semOne,semTwo;
static int num;

// 线程函数A用于接收参书
DWORD WINAPI ReadNumber(LPVOID lpParamter)
{
  int i;
  for (i = 0; i < 5; i++)
  {
    fputs("Input Number: ", stdout);

    // 临界区的开始 signaled状态  
    WaitForSingleObject(semTwo, INFINITE);
    
    scanf("%d", &num);

    // 临界区的结束 non-signaled状态  
    ReleaseSemaphore(semOne, 1, NULL);
  }
  return 0;
}

// 线程函数B: 用户接受参数后完成计算
DWORD WINAPI Check(LPVOID lpParamter)
{
  int sum = 0, i;
  for (i = 0; i < 5; i++)
  {
    // 临界区的开始 non-signaled状态  
    WaitForSingleObject(semOne, INFINITE);
    sum += num;

    // 临界区的结束 signaled状态  
    ReleaseSemaphore(semTwo, 1, NULL);
  }
  printf("The Number IS: %d \n", sum);
  return 0;
}

int main(int argc, char *argv[])
{
  HANDLE hThread1, hThread2;

  // 创建信号量对象,设置为0进入non-signaled状态  
  semOne = CreateSemaphore(NULL, 0, 1, NULL);

  // 创建信号量对象,设置为1进入signaled状态  
  semTwo = CreateSemaphore(NULL, 1, 1, NULL);

  hThread1 = CreateThread(NULL, 0, ReadNumber, NULL, 0, NULL);
  hThread2 = CreateThread(NULL, 0, Check, NULL, 0, NULL);

  // 关闭临界区
  WaitForSingleObject(hThread1, INFINITE);
  WaitForSingleObject(hThread2, INFINITE);

  CloseHandle(semOne);
  CloseHandle(semTwo);

  system("pause");
  return 0;
}

9.2.3 CreateMutex

CreateMutex 是Windows API提供的用于创建互斥体对象的函数之一,该函数用于创建一个互斥体对象,并返回一个表示该互斥体对象的句柄。可以通过WaitForSingleObject或者WaitForMultipleObjects函数等待互斥体对象,以确保只有一个线程能够访问共享资源,其他线程需要等待该线程释放互斥体对象后才能继续访问。当需要释放互斥体对象时,可以调用ReleaseMutex函数将其释放。

CreateMutex 函数常用于对共享资源的访问控制,避免多个线程同时访问导致数据不一致的问题。有时候,互斥体也被用于跨进程同步访问共享资源。

该函数的函数原型如下:

HANDLE CreateMutex(
  LPSECURITY_ATTRIBUTES lpMutexAttributes,
  BOOL                  bInitialOwner,
  LPCTSTR               lpName
);

参数说明:

  • lpMutexAttributes:指向SECURITY_ATTRIBUTES结构体的指针,指定互斥体对象的安全描述符和访问权限。通常设为NULL,表示使用默认值。
  • bInitialOwner:指定互斥体的初始状态,TRUE表示将互斥体设置为有所有权的状态,FALSE表示将互斥体设置为没有所有权的状态。
  • lpName:指定互斥体的名称,可以为NULL。

该函数是实现线程同步和进程通信,控制对共享资源的访问的重要手段之一,应用广泛且易用。

如下案例所示,使用互斥锁可以实现单位时间内,只允许一个线程拥有对共享资源的独占权限,从而实现了互不冲突的线程同步。

#include <windows.h>
#include <iostream>

using namespace std;

// 创建互斥锁
HANDLE hMutex = NULL;

// 线程函数
DWORD WINAPI Func(LPVOID lpParamter)
{
  for (int x = 0; x < 10; x++)
  {
    // 请求获得一个互斥锁
    WaitForSingleObject(hMutex, INFINITE);

    cout << "thread func" << endl;

    // 释放互斥锁
    ReleaseMutex(hMutex);
  }
  return 0;
}

int main(int argc,char * argv[])
{
  HANDLE hThread = CreateThread(NULL, 0, Func, NULL, 0, NULL);

  hMutex = CreateMutex(NULL, FALSE, "lyshark");
  CloseHandle(hThread);

  for (int x = 0; x < 10; x++)
  {
    // 请求获得一个互斥锁
    WaitForSingleObject(hMutex, INFINITE);
    cout << "main thread" << endl;
    
    // 释放互斥锁
    ReleaseMutex(hMutex);
  }
  system("pause");
  return 0;
}

当然通过互斥锁我们也可以实现赞单位时间内同时同步执行两个线程函数,如下代码所示;

#include <windows.h>
#include <iostream>

using namespace std;

// 创建互斥锁
HANDLE hMutex = NULL;
#define NUM_THREAD 50

// 线程函数1
DWORD WINAPI FuncA(LPVOID lpParamter)
{
  for (int x = 0; x < 10; x++)
  {
    // 请求获得一个互斥锁
    WaitForSingleObject(hMutex, INFINITE);

    cout << "this is thread func A" << endl;

    // 释放互斥锁
    ReleaseMutex(hMutex);
  }
  return 0;
}

// 线程函数2
DWORD WINAPI FuncB(LPVOID lpParamter)
{
  for (int x = 0; x < 10; x++)
  {
    // 请求获得一个互斥锁
    WaitForSingleObject(hMutex, INFINITE);

    cout << "this is thread func B" << endl;

    // 释放互斥锁
    ReleaseMutex(hMutex);
  }
  return 0;
}

int main(int argc, char * argv[])
{

  // 用来存储线程函数的句柄
  HANDLE tHandle[NUM_THREAD];

  // 创建互斥量,此时为signaled状态
  hMutex = CreateMutex(NULL, FALSE, "lyshark");

  for (int x = 0; x < NUM_THREAD; x++)
  {
    if (x % 2)
    {
      tHandle[x] = CreateThread(NULL, 0, FuncA, NULL, 0, NULL);
    }
    else
    {
      tHandle[x] = CreateThread(NULL, 0, FuncB, NULL, 0, NULL);
    }
  }

  // 等待所有线程函数执行完毕
  WaitForMultipleObjects(NUM_THREAD, tHandle, TRUE, INFINITE);
  
  // 销毁互斥对象
  CloseHandle(hMutex);

  system("pause");
  return 0;
}

9.2.4 ThreadParameters

在线程环境中,有时候启动新线程时我们需要对不同的线程传入不同的参数,通常实现线程传参的方法有许多,一般可分为使用全局变量,使用结构体,使用类的成员函数等,本节将使用结构体传参,通过创建一个结构体,将需要传递的参数存储在结构体中,并将结构体的指针传递给线程函数。子线程在执行时,可以通过该指针访问结构体中的参数。

对于简单的参数传递而言,线程函数中定义LPVOID允许传递一个参数,此时我们只需要在函数中接收并强转(int)(LPVOID)port即可获取到一个整数类型的参数,如下是一个简单的端口扫描软件代码片段。

#include <stdio.h>
#include <Windows.h>

// 线程函数接收一个参数
DWORD WINAPI ScanThread(LPVOID port)
{
  // 将参数强制转化为需要的类型
  int Port = (int)(LPVOID)port;
  printf("[+] 端口: %5d \n", port);
  return 1;
}

int main(int argc, char* argv[])
{
  HANDLE handle;

  for (int port = 0; port < 100; port++)
  {
    handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ScanThread, (LPVOID)port, 0, 0);
  }
  WaitForSingleObject(handle, INFINITE);

  system("pause");
  return 0;
}

当然只传递一个参数在多数情况下时不够使用的,而想在线程函数中传递多个参数,则需要传递一个结构指针,通过线程函数内部强转为结构类型后,即可实现取值,如下代码中我们首先定义了一个THREAD_PARAM结构体,该结构内有两个成员分别指定扫描主机地址以及端口号,当参数被传递到ScanThread线程函数内部时只需要将指针内的数据拷贝到自身线程函数内,即可正确的引用特定的参数。

#include <stdio.h>
#include <windows.h>

typedef struct _THREAD_PARAM
{
  char *HostAddr;             // 扫描主机
  DWORD dwStartPort;          // 端口号
}THREAD_PARAM;

// 这个扫描线程函数
DWORD WINAPI ScanThread(LPVOID lpParam)
{
  // 拷贝传递来的扫描参数
  THREAD_PARAM ScanParam = { 0 };
  MoveMemory(&ScanParam, lpParam, sizeof(THREAD_PARAM));
  printf("地址: %-16s --> 端口: %-5d \n", ScanParam.HostAddr, ScanParam.dwStartPort);
  return 0;
}

int main(int argc, char *argv[])
{
  THREAD_PARAM ThreadParam = { 0 };

  for (int ip = 0; ip < 100; ip++)
  {
    char src_addr[50] = "192.168.1.";
    char sub_addr[10] = {0};
    // int number = atoi(sub_addr);
    
    // 将整数转为字符串
    sprintf(sub_addr, "%d", ip);
    strcat(src_addr, sub_addr);

    // 将拼接好的字符串放到HostAddr
    ThreadParam.HostAddr = src_addr;

    for (DWORD port = 1; port < 10; port++)
    {
      // 指定端口号
      ThreadParam.dwStartPort = port;
      HANDLE hThread = CreateThread(NULL, 0, ScanThread, (LPVOID)&ThreadParam, 0, NULL);
      WaitForSingleObject(hThread, INFINITE);
    }
  }

  system("pause");
  return 0;
}

9.2.5 ThreadPool

Windows 线程池是一种异步执行任务的机制,可以将任务提交到线程池中,由线程池自动分配线程执行任务。线程池可以有效地利用系统资源,提高程序的并发能力和性能。Windows 线程池是Windows操作系统提供的一种原生的线程池机制,可以使用Windows API函数进行操作。

CreateThreadpoolWork 是Windows API提供的用于创建一个工作从池线程中执行的工作对象的函数之一,该函数用于创建一个工作项,并返回一个表示该工作项的指针。可以通过SubmitThreadpoolWork函数将该工作项提交到线程池中进行执行。当该工作项完成后,线程池还可以使用回调函数清理函数TP_FREE_CLEANUP_GROUP回收资源。

该函数的函数原型如下:

PTP_WORK CreateThreadpoolWork(
  PTP_WORK_CALLBACK  pfnwk,     // 工作项回调函数指针
  PVOID              pv,        // 回调函数的参数指针
  PTP_CALLBACK_ENVIRON pcbe      // 回调函数运行环境
);

参数说明:

  • pfnwk:指向工作项回调函数的指针,该函数将在工作线程池中执行。例如:
VOID CALLBACK MyWorkCallback(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID Context, PTP_WORK Work)
{
    // 实现工作项的具体操作
}
  • pv:指向回调函数的参数指针,由回调函数进行处理。

  • pcbe:指向TP_CALLBACK_ENVIRON结构体的指针,提供了回调函数需要的一些运行环境信息,例如可选的回调函数执行器TP_CALLBACK_INSTANCE和回调函数完成后的清理函数TP_CLEANUP_GROUP等。如果为NULL,则使用系统提供的默认值。

CreateThreadpoolWork 函数常用于实现线程池中的任务队列,可以将一个具体的任务封装为一个工作项,并提交到线程池中等待执行。该机制可以有效地提高任务的处理速度和效率,减少系统资源开销。但是,需要注意线程池的资源占用问题,合理调优,避免线程泄漏等问题。

CallbackMayRunLong 是Windows API提供的调用标记函数之一,该函数用于标记回调函数是否可能耗时较长。如果回调函数不会耗时较长,则无需调用该函数。如果回调函数可能耗时较长,则建议在执行回调函数之前调用该函数对回调函数进行标记,以便线程池进行资源分配和调度等策略。

CallbackMayRunLong函数的函数原型如下:

VOID CallbackMayRunLong(
  PTP_CALLBACK_INSTANCE pci
);

参数说明:

  • pci:指向TP_CALLBACK_INSTANCE结构体的指针,表示回调函数的执行器,用于提供回调函数的运行环境信息。

SubmitThreadpoolWork 是Windows API提供的将工作项提交到线程池中执行的函数之一,该函数用于将工作项提交到线程池中等待被工作者线程执行。线程池中的工作者线程通过GetQueuedCompletionStatus函数从工作队列中获取工作项并执行。通过SubmitThreadpoolWorkGetQueuedCompletionStatus结合使用,可以实现线程池中的任务队列,提高任务处理效率和系统性能。

SubmitThreadpoolWork 函数的函数原型如下:

VOID SubmitThreadpoolWork(
  PTP_WORK pwk
);

参数说明:

  • pwk:指向TP_WORK结构体的指针,表示要提交到线程池中执行的工作项。

读者需要注意,SubmitThreadpoolWork 函数提交的是工作项而不是回调函数,回调函数是通过事先创建工作项指定的。在使用SubmitThreadpoolWork提交工作项时,需要根据具体的业务需求进行合理的设计和实现,避免线程池资源浪费、性能下降、内存泄漏等问题。

WaitForThreadpoolWorkCallbacks 是Windows API提供的等待线程池中工作项完成的函数之一,该函数用于等待线程池中提交的所有工作项被处理完毕。需要注意的是,该函数会阻塞当前线程直到所有工作项处理完毕,因此需要谨慎使用,避免阻塞其它线程的执行。

WaitForThreadpoolWorkCallbacks 函数的函数原型如下:

VOID WaitForThreadpoolWorkCallbacks(
  PTP_WORK pwk,
  BOOL     fCancelPendingCallbacks
);

参数说明:

  • pwk:指向TP_WORK结构体的指针,表示要等待完成的工作项。
  • fCancelPendingCallbacks:用于指定是否取消所有待处理的工作项。如果为TRUE,则取消所有待处理的工作项;如果为FALSE,则等待所有待处理的工作项被处理完毕。

要使用CreateThreadpoolWork()创建一个线程池很容易实现,读者只需要指定TaskHandler线程函数即可,当需要启动线程池时通过调用SubmitThreadpoolWork函数提交一组请求即可,如下是一个简单的线程池创建功能实现。

#include <Windows.h>
#include <iostream>
#include <stdlib.h>

unsigned long g_count = 0;

// 线程执行函数
void NTAPI TaskHandler(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  if (CallbackMayRunLong(Instance))
  {
    printf("剩余资源: %d --> 线程ID: %d \n", InterlockedIncrement(&g_count), GetCurrentThreadId());
  }

  Sleep(5000);
  printf("运行子线程 \n");

  for (int x = 0; x < 100; x++)
  {
    printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
  }
}

int main(int argc,char *argv)
{
  PTP_WORK workItem = CreateThreadpoolWork(TaskHandler, NULL, NULL);

  for (int x = 0; x < 100; x++)
  {
    // 调用SubmitThreadpoolWork向线程池提交一个请求
    SubmitThreadpoolWork(workItem);
  }

  // 等待线程池调用结束
  WaitForThreadpoolWorkCallbacks(workItem, false);
  CloseThreadpoolWork(workItem);

  system("pause");
  return 0;
}

线程池函数同样支持限制线程数,限制线程可以通过调用SetThreadpoolThreadMinimum()实现,该函数可以在创建线程池后设置线程池的最小线程数。当线程池中的任务队列中存在待执行的任务,并且当前工作线程的数量小于最小线程数时,线程池将自动创建新的工作线程,以确保待执行任务能够及时得到处理。

以下是函数的原型定义:

VOID WINAPI SetThreadpoolThreadMinimum(
  PTP_POOL ptpp,
  DWORD     cthrdMic
);

参数说明:

  • ptpp:指向线程池对象的指针。
  • cthrdMic:线程池中的最小线程数。

线程池也支持分组操作,可通过绑定TP_CALLBACK_ENVIRON线程池环境变量实现分组,TP_CALLBACK_ENVIRON是Windows线程池API的一部分,它是一个环境变量结构体,用于确定要调用的线程池回调函数的环境。

以下是TP_CALLBACK_ENVIRON结构体的定义:

typedef struct _TP_CALLBACK_ENVIRON {
  TP_VERSION                  Version;
  PTP_POOL                    Pool;
  PTP_CLEANUP_GROUP           CleanupGroup;
  PFN_TP_SIMPLE_CALLBACK      CleanupGroupCancelCallback;
  PVOID                       RaceDll;
  struct _ACTIVATION_CONTEXT *ActivationContext;
  PFN_IO_CALLBACK             FinalizationCallback;
  union {
    DWORD Flags;
    struct {
      DWORD LongFunction : 1;
      DWORD Persistent   : 1;
      DWORD Private      : 30;
    } DUMMYSTRUCTNAME;
  } DUMMYUNIONNAME;
} TP_CALLBACK_ENVIRON, *PTP_CALLBACK_ENVIRON;

主要成员说明:

  • Version:回调环境的版本,必须为 TP_VERSION。
  • Pool:回调环境所属的线程池对象。
  • CleanupGroup:回调环境所属的清理组对象,用于控制回调的取消和资源管理。
  • CleanupGroupCancelCallback:当清理组取消回调时,所调用的回调函数。
  • RaceDll:保留字段,用于标记已经看过这个环境变量的 DLL。
  • ActivationContext:回调环境的激活上下文,用来保证回调中需要的外部资源正确加载。
  • FinalizationCallback:当回调函数执行完成后调用的函数。
  • Flags:回调环境的标志,用于设置回调函数的属性。

使用TP_CALLBACK_ENVIRON结构体,可以在创建线程池回调函数时,配置回调函数的环境和参数,以控制回调函数的执行方式和行为。

例如,可以使用TP_CALLBACK_ENVIRON中的CleanupGroupCleanupGroupCancelCallback成员,将回调函数添加到清理组中,并在需要时取消回调。又或者在FinalizationCallback中执行某些特殊的清理任务,以确保在回调函数执行完毕后释放资源。

#include <Windows.h>
#include <iostream>
#include <stdlib.h>

// 线程执行函数
void NTAPI TaskHandler(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  for (int x = 0; x < 100; x++)
  {
    printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
  }
}

// 单次线程任务
void NTAPI poolThreadFunc(PTP_CALLBACK_INSTANCE Instance, PVOID Context)
{
  printf("执行单次线程任务: %d \n", GetCurrentThreadId());
}

int main(int argc,char *argv)
{
  // 创建线程池
  PTP_POOL pool = CreateThreadpool(NULL);

  // 设置线程池 最小与最大 资源数
  SetThreadpoolThreadMinimum(pool, 1);
  SetThreadpoolThreadMaximum(pool, 100);

  // 初始化线程池环境变量
  TP_CALLBACK_ENVIRON cbe;

  InitializeThreadpoolEnvironment(&cbe);

  // 设置线程池回调的线程池
  SetThreadpoolCallbackPool(&cbe, pool);

  // 创建清理组
  PTP_CLEANUP_GROUP cleanupGroup = CreateThreadpoolCleanupGroup();

  // 为线程池设定清理组
  SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);

  // 创建线程池
  PTP_WORK pwork = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandler, NULL, &cbe);

  // 循环提交线程工作任务
  for (int x = 0; x < 100; x++)
  {
    SubmitThreadpoolWork(pwork);
  }

  // 提交单次线程任务
  TrySubmitThreadpoolCallback(poolThreadFunc, NULL, &cbe);
  TrySubmitThreadpoolCallback(poolThreadFunc, NULL, &cbe);

  // 等待线程池结束,关闭线程组
  WaitForThreadpoolWorkCallbacks(pwork, false);
  CloseThreadpoolWork(pwork);

  // 关闭清理组
  CloseThreadpoolCleanupGroupMembers(cleanupGroup, false, NULL);
  
  // 销毁线程池环境变量
  DestroyThreadpoolEnvironment(&cbe);

  CloseThreadpool(pool);
  system("pause");
  return 0;
}

当读者使用线程池时,同样会遇到线程的同步问题,线程池内的线程函数同样支持互斥锁、信号量、内核事件控制、临界区控制等同步和互斥机制,用于保护共享资源的访问和修改。

这些同步和互斥机制可以用来解决线程间竞争和数据不一致的问题。例如,在线程池中如果有多个工作线程同时访问共享资源,就需要使用互斥锁或临界区控制来确保每个线程对共享资源的使用不会相互干扰,避免出现数据竞争和不一致的情况。

使用这些同步和互斥机制时,应该根据实际场景进行选择和设计。例如,互斥锁适合用于保护少量的共享资源、需要经常访问和更新的场景,而信号量适合用于控制并发访问数量、资源池、生产者消费者模式等场景。同时,需要注意遵循线程安全和同步的原则,以避免死锁、饥饿等问题。

#include <Windows.h>
#include <iostream>
#include <stdlib.h>

unsigned long g_count = 0;

// --------------------------------------------------------------
// 线程池同步-互斥量同步
// --------------------------------------------------------------
void NTAPI TaskHandlerMutex(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 锁定资源
  WaitForSingleObject(*(HANDLE *)Context, INFINITE);

  for (int x = 0; x < 100; x++)
  {
    printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解锁资源
  ReleaseMutexWhenCallbackReturns(Instance, *(HANDLE*)Context);
}

void TestMutex()
{
  // 创建互斥量
  HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);

  PTP_WORK pool = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerMutex, &hMutex, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pool);
  }

  WaitForThreadpoolWorkCallbacks(pool, FALSE);
  CloseThreadpoolWork(pool);
  CloseHandle(hMutex);

  printf("相加后 ---> %d \n", g_count);
}

// --------------------------------------------------------------
// 线程池同步-事件内核对象
// --------------------------------------------------------------
void NTAPI TaskHandlerKern(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 锁定资源
  WaitForSingleObject(*(HANDLE *)Context, INFINITE);

  for (int x = 0; x < 100; x++)
  {
    printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解锁资源
  SetEventWhenCallbackReturns(Instance, *(HANDLE*)Context);
}

void TestKern()
{
  HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
  SetEvent(hEvent);

  PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerKern, &hEvent, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pwk);
  }

  WaitForThreadpoolWorkCallbacks(pwk, FALSE);
  CloseThreadpoolWork(pwk);

  printf("相加后 ---> %d \n", g_count);
}

// --------------------------------------------------------------
// 线程池同步-信号量同步
// --------------------------------------------------------------
void NTAPI TaskHandlerSemaphore(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 锁定资源
  WaitForSingleObject(*(HANDLE *)Context, INFINITE);

  for (int x = 0; x < 100; x++)
  {
    printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解锁资源
  ReleaseSemaphoreWhenCallbackReturns(Instance, *(HANDLE*)Context, 1);
}

void TestSemaphore()
{
  // 创建信号量为100
  HANDLE hSemaphore = CreateSemaphore(NULL, 0, 100, NULL);

  ReleaseSemaphore(hSemaphore, 10, NULL);

  PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerSemaphore, &hSemaphore, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pwk);
  }

  WaitForThreadpoolWorkCallbacks(pwk, FALSE);
  CloseThreadpoolWork(pwk);
  CloseHandle(hSemaphore);

  printf("相加后 ---> %d \n", g_count);
}

// --------------------------------------------------------------
// 线程池同步-临界区
// --------------------------------------------------------------
void NTAPI TaskHandlerLeave(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
  // 锁定资源
  EnterCriticalSection((CRITICAL_SECTION*)Context);

  for (int x = 0; x < 100; x++)
  {
    printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
    g_count = g_count + 1;
  }

  // 解锁资源
  LeaveCriticalSectionWhenCallbackReturns(Instance, (CRITICAL_SECTION*)Context);
}

void TestLeave()
{
  CRITICAL_SECTION cs;
  InitializeCriticalSection(&cs);

  PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerLeave, &cs, NULL);

  for (int i = 0; i < 1000; i++)
  {
    SubmitThreadpoolWork(pwk);
  }

  WaitForThreadpoolWorkCallbacks(pwk, FALSE);
  DeleteCriticalSection(&cs);
  CloseThreadpoolWork(pwk);

  printf("相加后 ---> %d \n", g_count);
}

int main(int argc,char *argv)
{
  // TestMutex();
  // TestKern();
  // TestSemaphore();
  TestLeave();

  system("pause");
  return 0;
}

本文作者: 王瑞
本文链接: https://www.lyshark.com/post/505839cb.html
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

热门相关:豪门蜜爱:独宠天后小萌妻   楚氏赘婿   慕少,你老婆又重生了   住在隔壁的有夫之妇   紫府仙缘