Linux網絡編程“驚群”問題總結

1、前言

我從事Linux系統下網絡開發將近4年了,經常還是遇到一些問題,只是知其然而不知其所以然,有時候和其他人交流,搞得非常尷尬。如今計算機都是多核了,網絡編程框架也逐步豐富多了,我所知道的有多進程、多線程、異步事件驅動常用的三種模型。最經典的模型就是Nginx中所用的Master-Worker多進程異步驅動模型。今天和大家一起討論一下網絡開發中遇到的“驚群”現象。之前只是聽說過這個現象,網上查資料也了解了基本概念,在實際的工作中還真沒有遇到過。今天周末,結合自己的理解和網上的資料,徹底將“驚群”弄明白。需要弄清楚如下幾個問題:

(1)什么是“驚群”,會產生什么問題?

(2)“驚群”的現象怎么用代碼模擬出來?

(3)如何處理“驚群”問題,處理“驚群”后的現象又是怎么樣呢?

2、何為驚群

如今網絡編程中經常用到多進程或多線程模型,大概的思路是父進程創建socket,bind、listen后,通過fork創建多個子進程,每個子進程繼承了父進程的socket,調用accpet開始監聽等待網絡連接。這個時候有多個進程同時等待網絡的連接事件,當這個事件發生時,這些進程被同時喚醒,就是“驚群”。這樣會導致什么問題呢?我們知道進程被喚醒,需要進行內核重新調度,這樣每個進程同時去響應這一個事件,而最終只有一個進程能處理事件成功,其他的進程在處理該事件失敗后重新休眠或其他。網絡模型如下圖所示:

Linux網絡編程“驚群”問題總結

簡而言之,驚群現象(thundering herd)就是當多個進程和線程在同時阻塞等待同一個事件時,如果這個事件發生,會喚醒所有的進程,但最終只可能有一個進程/線程對該事件進行處理,其他進程/線程會在失敗后重新休眠,這種性能浪費就是驚群。

3、編碼模擬“驚群”現象

我們已經知道了“驚群”是怎么回事,那么就按照上面的圖編碼實現看一下效果。我嘗試使用多進程模型,創建一個父進程綁定一個端口監聽socket,然后fork出多個子進程,子進程們開始循環處理(比如accept)這個socket。測試代碼如下所示:

?1 #include <stdio.h>

2 #include <unistd.h>

3 #include <sys/types.h>

4 #include <sys/socket.h>

5 #include <netinet/in.h>

6 #include <arpa/inet.h>

7 #include <assert.h>

8 #include <sys/wait.h>

9 #include <string.h>

10 #include <errno.h>

11

12 #define IP ? “127.0.0.1”

13 #define PORT ?8888

14 #define WORKER 4

15

16 int worker(int listenfd, int i)

17 {

18 ? ? while (1) {

19 ? ? ? ? printf(“I am worker %d, begin to accept connection.\n”, i);

20 ? ? ? ? struct sockaddr_in client_addr;

21 ? ? ? ? socklen_t client_addrlen = sizeof( client_addr );

22 ? ? ? ? int connfd = accept( listenfd, ( struct sockaddr* )&client_addr, &client_addrlen );

23 ? ? ? ? if (connfd != -1) {

24 ? ? ? ? ? ? printf(“worker %d accept a connection success.\t”, i);

25 ? ? ? ? ? ? printf(“ip :%s\t”,inet_ntoa(client_addr.sin_addr));

26 ? ? ? ? ? ? printf(“port: %d \n”,client_addr.sin_port);

27 ? ? ? ? } else {

28 ? ? ? ? ? ? printf(“worker %d accept a connection failed,error:%s”, i, strerror(errno));

close(connfd);

29 ? ? ? ? }

30 ? ? }

31 ? ? return 0;

32 }

33

34 int main()

35 {

36 ? ? int i = 0;

37 ? ? struct sockaddr_in address;

38 ? ? bzero(&address, sizeof(address));

39 ? ? address.sin_family = AF_INET;

40 ? ? inet_pton( AF_INET, IP, &address.sin_addr);

41 ? ? address.sin_port = htons(PORT);

42 ? ? int listenfd = socket(PF_INET, SOCK_STREAM, 0);

43 ? ? assert(listenfd >= 0);

44

45 ? ? int ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));

46 ? ? assert(ret != -1);

47

48 ? ? ret = listen(listenfd, 5);

49 ? ? assert(ret != -1);

50

51 ? ? for (i = 0; i < WORKER; i++) {

52 ? ? ? ? printf(“Create worker %d\n”, i+1);

53 ? ? ? ? pid_t pid = fork();

54 ? ? ? ? /*child ?process */

55 ? ? ? ? if (pid == 0) {

56 ? ? ? ? ? ? worker(listenfd, i);

57 ? ? ? ? }

58

59 ? ? ? ? if (pid < 0) {

60 ? ? ? ? ? ? printf(“fork error”);

61 ? ? ? ? }

62 ? ? }

63

64 ? ? /*wait child process*/

65 ? ? int status;

66 ? ? wait(&status);

67 ? ? return 0;

68 }

編譯執行,在本機上使用telnet 127.0.0.1 8888測試,結果如下所示:

Linux網絡編程“驚群”問題總結

按照“驚群”現象,期望結果應該是4個子進程都會accpet到請求,其中只有一個成功,另外三個失敗的情況。而實際的結果顯示,父進程開始創建4個子進程,每個子進程開始等待accept連接。當telnet連接來的時候,只有worker2 子進程accpet到請求,而其他的三個進程并沒有接收到請求。

這是什么原因呢?難道驚群現象是假的嗎?于是趕緊google查一下,驚群到底是怎么出現的。

其實在Linux2.6版本以后,內核內核已經解決了accept()函數的“驚群”問題,大概的處理方式就是,當內核接收到一個客戶連接后,?只會喚醒等待隊列上的第一個進程或線程?。所以,如果服務器采用accept阻塞調用方式,在最新的Linux系統上,已經沒有“驚群”的問題了。

但是,對于實際工程中常見的服務器程序,大都使用select、poll或epoll機制,此時,服務器不是阻塞在accept,而是阻塞在select、poll或epoll_wait,這種情況下的“驚群”仍然需要考慮。接下來以epoll為例分析:

使用epoll非阻塞實現代碼如下所示:

??1 #include <sys/types.h>

2 #include <sys/socket.h>

3 #include <sys/epoll.h>

4 #include <netdb.h>

5 #include <string.h>

6 #include <stdio.h>

7 #include <unistd.h>

8 #include <fcntl.h>

9 #include <stdlib.h>

10 #include <errno.h>

11 #include <sys/wait.h>

12 #include <unistd.h>

13

14 #define IP ? “127.0.0.1”

15 #define PORT ?8888

16 #define PROCESS_NUM 4

17 #define MAXEVENTS 64

18

19 static int create_and_bind ()

20 {

21 ? ? int fd = socket(PF_INET, SOCK_STREAM, 0);

22 ? ? struct sockaddr_in serveraddr;

23 ? ? serveraddr.sin_family = AF_INET;

24 ? ? inet_pton( AF_INET, IP, &serveraddr.sin_addr);

25 ? ? serveraddr.sin_port = htons(PORT);

26 ? ? bind(fd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));

27 ? ? return fd;

28 }

29

30 static int make_socket_non_blocking (int sfd)

31 {

32 ? ? int flags, s;

33 ? ? flags = fcntl (sfd, F_GETFL, 0);

34 ? ? if (flags == -1) {

35 ? ? ? ? perror (“fcntl”);

36 ? ? ? ? return -1;

37 ? ? }

38 ? ? flags |= O_NONBLOCK;

39 ? ? s = fcntl (sfd, F_SETFL, flags);

40 ? ? if (s == -1) {

41 ? ? ? ? perror (“fcntl”);

42 ? ? ? ? return -1;

43 ? ? }

44 ? ? return 0;

45 }

46

47 void worker(int sfd, int efd, struct epoll_event *events, int k) {

48 ? ? /* The event loop */

49 ? ? while (1) {

50 ? ? ? ? int n, i;

51 ? ? ? ? n = epoll_wait(efd, events, MAXEVENTS, -1);

52 ? ? ? ? printf(“worker ?%d return from epoll_wait!\n”, k);

53 ? ? ? ? for (i = 0; i < n; i++) {

54 ? ? ? ? ? ? if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events &EPOLLIN))) {

55 ? ? ? ? ? ? ? ? /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */

56 ? ? ? ? ? ? ? ? fprintf (stderr, “epoll error\n”);

57 ? ? ? ? ? ? ? ? close (events[i].data.fd);

58 ? ? ? ? ? ? ? ? continue;

59 ? ? ? ? ? ? } else if (sfd == events[i].data.fd) {

60 ? ? ? ? ? ? ? ? /* We have a notification on the listening socket, which means one or more incoming connections. */

61 ? ? ? ? ? ? ? ? struct sockaddr in_addr;

62 ? ? ? ? ? ? ? ? socklen_t in_len;

63 ? ? ? ? ? ? ? ? int infd;

64 ? ? ? ? ? ? ? ? char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

65 ? ? ? ? ? ? ? ? in_len = sizeof in_addr;

66 ? ? ? ? ? ? ? ? infd = accept(sfd, ∈_addr, ∈_len);

67 ? ? ? ? ? ? ? ? if (infd == -1) {

68 ? ? ? ? ? ? ? ? ? ? printf(“worker %d accept failed!\n”, k);

69 ? ? ? ? ? ? ? ? ? ? break;

70 ? ? ? ? ? ? ? ? }

71 ? ? ? ? ? ? ? ? printf(“worker %d accept successed!\n”, k);

72 ? ? ? ? ? ? ? ? /* Make the incoming socket non-blocking and add it to the list of fds to monitor. */

73 ? ? ? ? ? ? ? ? close(infd);

74 ? ? ? ? ? ? }

75 ? ? ? ? }

76 ? ? }

77 }

78

79 int main (int argc, char *argv[])

80 {

81 ? ? int sfd, s;

82 ? ? int efd;

83 ? ? struct epoll_event event;

84 ? ? struct epoll_event *events;

85 ? ? sfd = create_and_bind();

86 ? ? if (sfd == -1) {

87 ? ? ? ? abort ();

88 ? ? }

89 ? ? s = make_socket_non_blocking (sfd);

90 ? ? if (s == -1) {

91 ? ? ? ? abort ();

92 ? ? }

93 ? ? s = listen(sfd, SOMAXCONN);

94 ? ? if (s == -1) {

95 ? ? ? ? perror (“listen”);

96 ? ? ? ? abort ();

97 ? ? }

98 ? ? efd = epoll_create(MAXEVENTS);

99 ? ? if (efd == -1) {

100 ? ? ? ? perror(“epoll_create”);

101 ? ? ? ? abort();

102 ? ? }

103 ? ? event.data.fd = sfd;

104 ? ? event.events = EPOLLIN;

105 ? ? s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);

106 ? ? if (s == -1) {

107 ? ? ? ? perror(“epoll_ctl”);

108 ? ? ? ? abort();

109 ? ? }

110

111 ? ? /* Buffer where events are returned */

112 ? ? events = calloc(MAXEVENTS, sizeof event);

113 ? ? int k;

114 ? ? for(k = 0; k < PROCESS_NUM; k++) {

115 ? ? ? ? printf(“Create worker %d\n”, k+1);

116 ? ? ? ? int pid = fork();

117 ? ? ? ? if(pid == 0) {

118 ? ? ? ? ? ? worker(sfd, efd, events, k);

119 ? ? ? ? }

120 ? ? }

121 ? ? int status;

122 ? ? wait(&status);

123 ? ? free (events);

124 ? ? close (sfd);

125 ? ? return EXIT_SUCCESS;

126 }

父進程中創建套接字,并設置為非阻塞,開始listen。然后fork出4個子進程,在worker中調用epoll_wait開始accpet連接。使用telnet測試結果如下:

Linux網絡編程“驚群”問題總結

從結果看出,與上面是一樣的,只有一個進程接收到連接,其他三個沒有收到,說明沒有發生驚群現象。這又是為什么呢?

在早期的Linux版本中,內核對于阻塞在epoll_wait的進程,也是采用全部喚醒的機制,所以存在和accept相似的“驚群”問題。新版本的的解決方案也是?只會喚醒等待隊列上的第一個進程或線程?,所以,新版本Linux??部分的?解決了epoll的“驚群”問題。所謂?部分的?解決,意思就是:對于部分特殊場景,使用epoll機制,已經不存在“驚群”的問題了,但是對于大多數場景,epoll機制仍然存在“驚群”。

epoll存在驚群的場景如下:在worker保持工作的狀態下,都會被喚醒,例如在epoll_wait后調用sleep一次。改寫woker函數如下:

void worker(int sfd, int efd, struct epoll_event *events, int k) {

/* The event loop */

while (1) {

int n, i;

n = epoll_wait(efd, events, MAXEVENTS, -1);

/*keep running*/

sleep(2);

printf(“worker ?%d return from epoll_wait!\n”, k);

for (i = 0; i < n; i++) {

if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events &EPOLLIN))) {

/* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */

fprintf (stderr, “epoll error\n”);

close (events[i].data.fd);

continue;

} else if (sfd == events[i].data.fd) {

/* We have a notification on the listening socket, which means one or more incoming connections. */

struct sockaddr in_addr;

socklen_t in_len;

int infd;

char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;

infd = accept(sfd, ∈_addr, ∈_len);

if (infd == -1) {

printf(“worker %d accept failed,error:%s\n”, k, strerror(errno));

break;

}

printf(“worker %d accept successed!\n”, k);

/* Make the incoming socket non-blocking and add it to the list of fds to monitor. */

close(infd);

}

}

}

}

測試結果如下所示:

Linux網絡編程“驚群”問題總結

終于看到驚群現象的出現了。

4、解決驚群問題

Nginx中使用mutex互斥鎖解決這個問題,具體措施有使用全局互斥鎖,每個子進程在epoll_wait()之前先去申請鎖,申請到則繼續處理,獲取不到則等待,并設置了一個負載均衡的?算法?(當某一個子進程的任務量達到總設置量的7/8時,則不會再嘗試去申請鎖)來均衡各個進程的任務量。后面深入學習一下Nginx的驚群處理過程。

相關新聞

聯系我們

400-080-6560

在線咨詢:點擊這里給我發消息

郵件:[email protected]

工作時間:周一至周日,09:00-18:30

QR code
云南快乐10分开奖直播