golang并发请求http几种方式

golang并发请求http几种方式

并发方式

平时常用到2种方式做并发请求.

  1. 简单粗暴, 一次请求开一个线程, 线程中完成请求与响应处理.

  2. 优雅一点, 限制线程池, 以master-worker的方式处理并发, 最后又把响应统一处理.

1.简单并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)

func main() {
var numberTasks = [3]string{"13456755448", " 13419385751", "13419317885"}
client = &http.Client{}
beg := time.Now()
wg := &sync.WaitGroup{}
for _, keyword := range numberTasks {
wg.Add(1)
go func(keyword string, group *sync.WaitGroup) {
body, err := NumberQueryRequest(keyword)
if err != nil {
fmt.Printf("error occurred in query keyword: %s, error: %s\n",
keyword,
err.Error())
} else {
fmt.Printf("search %s success, data size is %d\n, body is %s\n",
keyword,
len(body),
string(body))
}
group.Done()
}(keyword, wg)
}
wg.Wait() //waitGroup在主线程等待所有任务完成
fmt.Printf("time consumed: %fs", time.Now().Sub(beg).Seconds())
}

var client *http.Client

// NumberQueryRequest GET请求的通用写法, 注意复用client即可.
func NumberQueryRequest(keyword string) (body []byte, err error) {
baseUrl := "https://api.binstd.com/shouji/query?appkey=df2720f76a0991fa&shouji="
url := baseUrl + keyword
resp, err := client.Get(url)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
data, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("response status code is not OK, response code is %d, body:%s",
resp.StatusCode,
string(data))
}
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}

解析

  • 在for循环中有多少个任务就go出去多少个协程,没有限制,在查询量固定或者不大的时候没什么问题.
  • 没有用到 channel的特性, 仅利用了多核调度.

2. master-worker形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main

import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
const (
routineCountTotal = 5 //限制线程数
)

func main() {
var numberTasks = [5]string{"13456755448", " 13419385751", "13419317885", " 13434343439", "13438522395"}
client = &http.Client{}
beg := time.Now()
wg := &sync.WaitGroup{}
tasks := make(chan string)
results := make(chan string)
//receiver接受响应并处理的函数块, 也可以单独写在一个函数
go func() {
for result := range results{
if result == ""{
close(results)
}else{
fmt.Println("result:", result)
}
}
}()
for i:= 0; i < routineCountTotal; i++{
wg.Add(1)
go worker(wg, tasks, results)
}
//分发任务
for _, task := range numberTasks{
tasks <- task
}
tasks <- "" //worker结束标志
wg.Wait() //同步结束
results <- "" // result结束标志
fmt.Printf("time consumed: %fs", time.Now().Sub(beg).Seconds())
}

func worker(group *sync.WaitGroup, tasks chan string, result chan string){
for task := range tasks{
if task == ""{
close(tasks)
}else{
respBody, err := NumberQueryRequest(task)
if err != nil{
fmt.Printf("error occurred in NumberQueryRequest: %s\n", task)
result <- err.Error()
}else{
result <- string(respBody)
}
}
}
group.Done()
}

var client *http.Client

func NumberQueryRequest(keyword string) (body []byte, err error) {
url := fmt.Sprintf("https://api.binstd.com/shouji/query?appkey=df2720f76a0991fa&shouji=%s", keyword)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36")
resp, err := client.Get(url)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
data, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("response status code is not OK, response code is %d, body:%s", resp.StatusCode, string(data))
}
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}

3. 使用带缓冲区的chan控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package golimit

type Glimit struct {
Num int
C chan struct{}
}

func NewG(num int) *Glimit {
return &Glimit{
Num: num,
C : make(chan struct{}, num),
}
}

func (g *Glimit) Run(f func()){
g.C <- struct{}{}
go func() {
f()
<-g.C
}()
}

使用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
number := 10
// 限制线程数2
g := NewG(2)
wg := sync.WaitGroup{}
for i := 0; i < number; i++ {
wg.Add(1)
value :=i // 闭包传值方式
goFunc := func() {
// 做一些业务逻辑处理
fmt.Printf("go func: %d\n", value)
time.Sleep(time.Second)
wg.Done()
}
g.Run(goFunc)
}
wg.Wait()
}

并发请求的用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"./golimit"
)

const (
routineCountTotal = 5 //限制线程数
)

func main() {
var numberTasks = [5]string{"13456755448", " 13419385751", "13419317885", " 13434343439", "13438522395"}

g := golimit.NewG(routineCountTotal)
wg := &sync.WaitGroup{}
client = &http.Client{}
beg := time.Now()
for i := 0; i < len(numberTasks); i++ {
wg.Add(1)
task := numberTasks[i]
g.Run(func() {
respBody, err := NumberQueryRequest(task)
if err != nil {
fmt.Printf("error occurred in NumberQueryRequest: %s\n", task)
} else {
fmt.Printf("response data: %s\n", string(respBody))
}
wg.Done()
})
}
wg.Wait()
fmt.Printf("time consumed: %fs", time.Now().Sub(beg).Seconds())
}
var client *http.Client

func NumberQueryRequest(keyword string) (body []byte, err error) {
url := fmt.Sprintf("https://api.binstd.com/shouji/query?appkey=df2720f76a0991fa&shouji=%s", keyword)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36")
resp, err := client.Get(url)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
data, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("response status code is not OK, response code is %d, body:%s", resp.StatusCode, string(data))
}
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}

解析

  • 比第一种方式更规范一些, 线程数量可以控制.
  • 两处close通道的地方都用了传特殊空值的方式, 似乎不太优雅, 欢迎有更好的关闭通道的方式互相交流, 谢谢.