[other] 关于go语言channel问题,这里老哥懂得多,说话又好听,只能来这问了
Tofloor
poster avatar
raspbian
deepin
2023-10-24 09:51
Author

我有一个需求

遍历一个结构体切片

其中每一个元素起一个协程,用来处理数据,并通过通道返回给主协程

已经使用了空结构体控制了协程的并发数量

希望老哥们指点迷津

我愿意为了一个优秀的解决方案付费

现在存在的问题
1.使用延时方法关闭channel具有不确定性,跑真实数据的时候不确定什么时候正常结束退出
2.使用了据说不推荐的goto LABEL 方法
package chang

import (
	"fmt"
	"time"
)

/*
模拟一个长时间的任务
*/
func mission(index int, msg chan string, limit chan struct{}) {
	// 这是一个长时间任务
	for i := 0; i < 10; i++ {
		fmt.Printf("这是第%d个协程的第%d次响应\n", index+1, i)
		time.Sleep(300 * time.Millisecond)
	}
	//实际上msg需要传输经过函数处理的内容
	msg <- fmt.Sprintf("这是第%d个协程\n", index)
	fmt.Printf("从通道中释放一个空结构体%v\n", <-limit)
}

/*
主程序,之前试过让最后一个协程关闭msg通道,但是最后一个协程并不知道一起并发的另外四个进程是否结束,有可能丢数据
现在存在的问题
1.使用延时方法关闭channel具有不确定性,跑真实数据的时候不确定什么时候正常结束退出
2.使用了据说不推荐的goto LABEL 方法
*/
func master() {
	limit := make(chan struct{}, 5)
	msg := make(chan string, 1)
	go func() {
		for i := 0; i < 50; i++ {
			limit <- struct{}{}
			go mission(i, msg, limit)
		}
	}()
	//用来保存从所有通道获取的结果
	var finally []string
	for {
		select {
		case data := <-msg:
			finally = append(finally, data)
		case <-time.After(5 * time.Second):
			fmt.Println("Timeout: No data received,after 3 second")
			goto fin
		}
	}
fin:
	fmt.Printf("finally is %v\ntype is %T\n", finally, finally)
}

Reply Favorite View the author
All Replies
liwl
deepin
2023-10-24 16:35
#1

like

老哥会说话,但是我不会,哈哈哈

Reply View the author
coldlook
deepin
2023-10-24 17:37
#2
package chang

import (
	"fmt"
	"sync"
	"time"
)

func mission(index int, msg chan string, limit chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()
	// 这是一个长时间任务
	for i := 0; i < 10; i++ {
		fmt.Printf("这是第%d个协程的第%d次响应\n", index+1, i)
		time.Sleep(300 * time.Millisecond)
	}
	//实际上msg需要传输经过函数处理的内容
	msg <- fmt.Sprintf("这是第%d个协程\n", index)
	<-limit
	fmt.Printf("从通道中释放一个空结构体\n")
}

func master() {
	limit := make(chan struct{}, 5)
	msg := make(chan string, 50)
	var wg sync.WaitGroup

	for i := 0; i < 50; i++ {
		limit <- struct{}{}
		wg.Add(1)
		go mission(i, msg, limit, &wg)
	}

	go func() {
		wg.Wait()
		close(msg)
	}()

	//用来保存从所有通道获取的结果
	var finally []string
	for data := range msg {
		finally = append(finally, data)
	}

	fmt.Printf("finally is %v\ntype is %T\n", finally, finally)
}

Reply View the author
myml
Super Moderator
Developer
2023-10-24 19:14
#3

楼上的老哥已经提供了实现代码。我这里贴下怎么在for+select避免goto的使用

//用来保存从所有通道获取的结果
	var loop = true 
	var finally []string
	for loop {
		select {
		case data := <-msg:
			finally = append(finally, data)
		case <-time.After(5 * time.Second):
			fmt.Println("Timeout: No data received,after 3 second")
			loop = false
		}
	}
Reply View the author
myml
Super Moderator
Developer
2023-10-24 19:21
#4

另外你的这个场景最好不要用time.After,因为这样写每次循环都会创建出一个chan,函数注释里面有写

// After waits for the duration to elapse and then sends the current time
// on the returned channel.
// It is equivalent to NewTimer(d).C.
// The underlying Timer is not recovered by the garbage collector
// until the timer fires. If efficiency is a concern, use NewTimer
// instead and call Timer.Stop if the timer is no longer needed.

Reply View the author
thepoy
deepin
2023-10-24 21:25
#5

提供个思路,使用一个 channel 控制并发数量(简单协程池),使用 for range 遍历 msg channel,协程池为空时可判定为结束全部任务,关闭 msg channel,for range 也会立即结束。

使用 sync.WaitGroup可能会产生不可控的大量的协程,不如用协池程管理。

Reply View the author
raspbian
deepin
2023-10-25 11:39
#6

给楼上的大佬们点赞

@thepoy

@myml

@Coldlook

Reply View the author