欢迎关注我的公众号获取更多原创技术文章:
嗯,Go 设计模式实战系列,一个设计模式业务真实使用的 golang 系列。
本系列主要分享,如何在我们的真实业务场景中使用设计模式。
本系列文章主要采用如下结构:
本文主要介绍「组合模式」结合 Go 语言天生的并发特性,如何在真实业务场景中使用。
之前文章《代码组件 | Go 设计模式实战》已经介绍了「组合模式」的概念,以及在业务中的使用。今天我们结合 Go 语言天生的并发特性,升级「组合模式」为「并发组合模式」。
我们先来简单回顾下「组合模式」的知识,详细可以查看上篇文章《代码组件 | Go 设计模式实战》
组合模式的概念:
一个具有层级关系的对象由一系列拥有父子关系的对象通过树形结构组成。
并发组合模式的概念:
一个具有层级关系的对象由一系列拥有父子关系的对象通过树形结构组成,子对象即可被串行执行,也可被并发执行
并发组合模式的优势:
我们还是以「组合模式」中的“订单结算页面”为例,继续来看看某东的订单结算页面:
从页面的展示形式上,可以看出:
按照「组合模式」的业务逻辑执行流程:
但是,我们很清楚有些模块之间并没有依赖,且该模块涉及服务远程调用等阻塞操作,比如:
所以:有的模块其实可以被并发的执行。
如果把上面不存在依赖关系的模块修改为并发的执行,则我们得到如下的执行流程:
关于「并发组合模式」的建模过程完全可以参考之前文章《代码组件 | Go 设计模式实战》,我们这里只说说需要着重注意的地方。
「并发组合模式」的核心还是Component
组件接口,我们先看看「组合模式」的Component
组件接口如下(再之前的文章上做了优化,进一步封装提取了BusinessLogicDo
方法):
// Component 组件接口
type Component interface {
// 添加一个子组件
Mount(c Component, components ...Component) error
// 移除一个子组件
Remove(c Component) error
// 执行当前组件业务和执行子组件
// ctx 业务上下文
// currentConponent 当前组件
Do(ctx *Context, currentConponent Component) error
// 执行当前组件业务业务逻辑
BusinessLogicDo(ctx *Context) error
// 执行子组件
ChildsDo(ctx *Context) error
}
再来看看「并发组合模式」的 Component`组件接口,如下(重点看和「组合模式」的区别):
// Component 组件接口
type Component interface {
// 添加一个子组件
Mount(c Component, components ...Component) error
// 移除一个子组件
Remove(c Component) error
// 执行当前组件业务:`BusinessLogicDo`和执行子组件:`ChildsDo`
// ctx 业务上下文
// currentConponent 当前组件
// wg 父组件的 WaitGroup 对象
// 区别 1:增加了 WaitGroup 对象参数,目的是等待并发子组件的执行完成。
Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) error
// 执行当前组件业务逻辑
// resChan 回写当前组件业务执行结果的 channel
// 区别 2:增加了一个 channel 参数,目的是并发组件执行逻辑时引入了超时机制,需要一个 channel 接受组件的执行结果
BusinessLogicDo(resChan chan interface{}) error
// 执行子组件
ChildsDo(ctx *Context) error
}
我们详细再来看,相对于「组合模式」,引入并发之后需要着重关注如下几点:
context.WithTimeout
sync.WaitGroup
select
和<-ctx.Done()
// Context 业务上下文
type Context struct {
// context.WithTimeout 派生的子上下文
TimeoutCtx context.Context
// 超时函数
context.CancelFunc
}
增加新的并发基础组件结构体BaseConcurrencyComponent
,并合成复用「组合模式」中的基础组件BaseComponent
,如下:
// BaseConcurrencyComponent 并发基础组件
type BaseConcurrencyComponent struct {
// 合成复用基础组件
BaseComponent
// 当前组件是否有并发子组件
HasChildConcurrencyComponents bool
// 并发子组件列表
ChildConcurrencyComponents []Component
// wg 对象
*sync.WaitGroup
// 当前组件业务执行结果 channel
logicResChan chan interface{}
// 当前组件执行过程中的错误信息
Err error
}
修改「组合模式」中的ChildsDo
方法,使其支持并发执行子组件,主要修改和实现如下:
go
关键字执行子组件*WaitGroup.Wait()
等待子组件执行结果// ChildsDo 执行子组件
func (bc *BaseConcurrencyComponent) ChildsDo(ctx *Context) (err error) {
if bc.WaitGroup == nil {
bc.WaitGroup = &sync.WaitGroup{}
}
// 执行并发子组件
for _, childComponent := range bc.ChildConcurrencyComponents {
bc.WaitGroup.Add(1)
go childComponent.Do(ctx, childComponent, bc.WaitGroup)
}
// 执行子组件
for _, childComponent := range bc.ChildComponents {
if err = childComponent.Do(ctx, childComponent, nil); err != nil {
return err
}
}
if bc.HasChildConcurrencyComponents {
// 等待并发组件执行结果
bc.WaitGroup.Wait()
}
return
}
select
关键字 context.WithTimeout()派生的子上下文 Done()方案返回的 channel,发生超时该 channel 会被关闭。具体实现代码如下:
// Do 执行子组件
// ctx 业务上下文
// currentConponent 当前组件
// wg 父组件的 waitgroup 对象
func (bc *BaseConcurrencyComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
defer wg.Done()
// 初始化并发子组件 channel
if bc.logicResChan == nil {
bc.logicResChan = make(chan interface{}, 1)
}
go currentConponent.BusinessLogicDo(bc.logicResChan)
select {
// 等待业务执行结果
case <-bc.logicResChan:
// 业务执行结果
fmt.Println(runFuncName(), "bc.BusinessLogicDo wait.done...")
break
// 超时等待
case <-ctx.TimeoutCtx.Done():
// 超时退出
fmt.Println(runFuncName(), "bc.BusinessLogicDo timeout...")
bc.Err = ErrConcurrencyComponentTimeout
break
}
// 执行子组件
err = currentConponent.ChildsDo(ctx)
return
}
var (
// ErrConcurrencyComponentTimeout 并发组件业务超时
ErrConcurrencyComponentTimeout = errors.New("Concurrency Component Timeout")
)
// Context 业务上下文
type Context struct {
// context.WithTimeout 派生的子上下文
TimeoutCtx context.Context
// 超时函数
context.CancelFunc
}
// GetContext 获取业务上下文实例
// d 超时时间
func GetContext(d time.Duration) *Context {
c := &Context{}
c.TimeoutCtx, c.CancelFunc = context.WithTimeout(context.Background(), d)
return c
}
// Component 组件接口
type Component interface {
// 添加一个子组件
Mount(c Component, components ...Component) error
// 移除一个子组件
Remove(c Component) error
// 执行当前组件业务:`BusinessLogicDo`和执行子组件:`ChildsDo`
// ctx 业务上下文
// currentConponent 当前组件
// wg 父组件的 waitgroup 对象
Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) error
// 执行当前组件业务逻辑
// resChan 回写当前组件业务执行结果的 channel
BusinessLogicDo(resChan chan interface{}) error
// 执行子组件
ChildsDo(ctx *Context) error
}
// BaseComponent 基础组件
// 实现 Add:添加一个子组件
// 实现 Remove:移除一个子组件
type BaseComponent struct {
// 子组件列表
ChildComponents []Component
}
// Mount 挂载一个子组件
func (bc *BaseComponent) Mount(c Component, components ...Component) (err error) {
bc.ChildComponents = append(bc.ChildComponents, c)
if len(components) == 0 {
return
}
bc.ChildComponents = append(bc.ChildComponents, components...)
return
}
// Remove 移除一个子组件
func (bc *BaseComponent) Remove(c Component) (err error) {
if len(bc.ChildComponents) == 0 {
return
}
for k, childComponent := range bc.ChildComponents {
if c == childComponent {
fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
bc.ChildComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
}
}
return
}
// Do 执行子组件
// ctx 业务上下文
// currentConponent 当前组件
// wg 父组件的 waitgroup 对象
func (bc *BaseComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
//执行当前组件业务代码
err = currentConponent.BusinessLogicDo(nil)
if err != nil {
return err
}
// 执行子组件
return currentConponent.ChildsDo(ctx)
}
// BusinessLogicDo 当前组件业务逻辑代码填充处
func (bc *BaseComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// do nothing
return
}
// ChildsDo 执行子组件
func (bc *BaseComponent) ChildsDo(ctx *Context) (err error) {
// 执行子组件
for _, childComponent := range bc.ChildComponents {
if err = childComponent.Do(ctx, childComponent, nil); err != nil {
return err
}
}
return
}
// BaseConcurrencyComponent 并发基础组件
type BaseConcurrencyComponent struct {
// 合成复用基础组件
BaseComponent
// 当前组件是否有并发子组件
HasChildConcurrencyComponents bool
// 并发子组件列表
ChildConcurrencyComponents []Component
// wg 对象
*sync.WaitGroup
// 当前组件业务执行结果 channel
logicResChan chan interface{}
// 当前组件执行过程中的错误信息
Err error
}
// Remove 移除一个子组件
func (bc *BaseConcurrencyComponent) Remove(c Component) (err error) {
if len(bc.ChildComponents) == 0 {
return
}
for k, childComponent := range bc.ChildComponents {
if c == childComponent {
fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
bc.ChildComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
}
}
for k, childComponent := range bc.ChildConcurrencyComponents {
if c == childComponent {
fmt.Println(runFuncName(), "移除:", reflect.TypeOf(childComponent))
bc.ChildConcurrencyComponents = append(bc.ChildComponents[:k], bc.ChildComponents[k+1:]...)
}
}
return
}
// MountConcurrency 挂载一个并发子组件
func (bc *BaseConcurrencyComponent) MountConcurrency(c Component, components ...Component) (err error) {
bc.HasChildConcurrencyComponents = true
bc.ChildConcurrencyComponents = append(bc.ChildConcurrencyComponents, c)
if len(components) == 0 {
return
}
bc.ChildConcurrencyComponents = append(bc.ChildConcurrencyComponents, components...)
return
}
// ChildsDo 执行子组件
func (bc *BaseConcurrencyComponent) ChildsDo(ctx *Context) (err error) {
if bc.WaitGroup == nil {
bc.WaitGroup = &sync.WaitGroup{}
}
// 执行并发子组件
for _, childComponent := range bc.ChildConcurrencyComponents {
bc.WaitGroup.Add(1)
go childComponent.Do(ctx, childComponent, bc.WaitGroup)
}
// 执行子组件
for _, childComponent := range bc.ChildComponents {
if err = childComponent.Do(ctx, childComponent, nil); err != nil {
return err
}
}
if bc.HasChildConcurrencyComponents {
// 等待并发组件执行结果
bc.WaitGroup.Wait()
}
return
}
// Do 执行子组件
// ctx 业务上下文
// currentConponent 当前组件
// wg 父组件的 waitgroup 对象
func (bc *BaseConcurrencyComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
defer wg.Done()
// 初始化并发子组件 channel
if bc.logicResChan == nil {
bc.logicResChan = make(chan interface{}, 1)
}
go currentConponent.BusinessLogicDo(bc.logicResChan)
select {
// 等待业务执行结果
case <-bc.logicResChan:
// 业务执行结果
fmt.Println(runFuncName(), "bc.BusinessLogicDo wait.done...")
break
// 超时等待
case <-ctx.TimeoutCtx.Done():
// 超时退出
fmt.Println(runFuncName(), "bc.BusinessLogicDo timeout...")
bc.Err = ErrConcurrencyComponentTimeout
break
}
// 执行子组件
err = currentConponent.ChildsDo(ctx)
return
}
// CheckoutPageComponent 订单结算页面组件
type CheckoutPageComponent struct {
// 合成复用基础组件
BaseConcurrencyComponent
}
// BusinessLogicDo 当前组件业务逻辑代码填充处
func (bc *CheckoutPageComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "订单结算页面组件...")
return
}
// AddressComponent 地址组件
type AddressComponent struct {
// 合成复用基础组件
BaseConcurrencyComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *AddressComponent) BusinessLogicDo(resChan chan interface{}) error {
fmt.Println(runFuncName(), "地址组件...")
fmt.Println(runFuncName(), "获取地址信息 ing...")
// 模拟远程调用地址服务
http.Get("http://example.com/")
resChan <- struct{}{} // 写入业务执行结果
fmt.Println(runFuncName(), "获取地址信息 done...")
return nil
}
// PayMethodComponent 支付方式组件
type PayMethodComponent struct {
// 合成复用基础组件
BaseConcurrencyComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *PayMethodComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "支付方式组件...")
fmt.Println(runFuncName(), "获取支付方式 ing...")
// 模拟远程调用地址服务 略
resChan <- struct{}{}
fmt.Println(runFuncName(), "获取支付方式 done...")
return nil
}
// StoreComponent 店铺组件
type StoreComponent struct {
// 合成复用基础组件
BaseComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *StoreComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "店铺组件...")
return
}
// SkuComponent 商品组件
type SkuComponent struct {
// 合成复用基础组件
BaseComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *SkuComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "商品组件...")
return
}
// PromotionComponent 优惠信息组件
type PromotionComponent struct {
// 合成复用基础组件
BaseComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *PromotionComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "优惠信息组件...")
return
}
// ExpressComponent 物流组件
type ExpressComponent struct {
// 合成复用基础组件
BaseComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *ExpressComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "物流组件...")
return
}
// AftersaleComponent 售后组件
type AftersaleComponent struct {
// 合成复用基础组件
BaseComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *AftersaleComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "售后组件...")
return
}
// InvoiceComponent 发票组件
type InvoiceComponent struct {
// 合成复用基础组件
BaseConcurrencyComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *InvoiceComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "发票组件...")
fmt.Println(runFuncName(), "获取发票信息 ing...")
// 模拟远程调用地址服务 略
resChan <- struct{}{} // 写入业务执行结果
fmt.Println(runFuncName(), "获取发票信息 done...")
return
}
// CouponComponent 优惠券组件
type CouponComponent struct {
// 合成复用基础组件
BaseConcurrencyComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *CouponComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "优惠券组件...")
fmt.Println(runFuncName(), "获取最优优惠券 ing...")
// 模拟远程调用优惠券服务
http.Get("http://example.com/")
// 写入业务执行结果
resChan <- struct{}{}
fmt.Println(runFuncName(), "获取最优优惠券 done...")
return
}
// GiftCardComponent 礼品卡组件
type GiftCardComponent struct {
// 合成复用基础组件
BaseConcurrencyComponent
}
// BusinessLogicDo 并发组件实际填充业务逻辑的地方
func (bc *GiftCardComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "礼品卡组件...")
fmt.Println(runFuncName(), "获取礼品卡信息 ing...")
// 模拟远程调用地址服务 略
resChan <- struct{}{} // 写入业务执行结果
fmt.Println(runFuncName(), "获取礼品卡信息 done...")
return
}
// OrderComponent 订单金额详细信息组件
type OrderComponent struct {
// 合成复用基础组件
BaseComponent
}
// BusinessLogicDo 当前组件业务逻辑代码填充处
func (bc *OrderComponent) BusinessLogicDo(resChan chan interface{}) (err error) {
// 当前组件的业务逻辑写这
fmt.Println(runFuncName(), "订单金额详细信息组件...")
return
}
// Demo 示例
func Demo() {
// 初始化订单结算页面 这个大组件
checkoutPage := &CheckoutPageComponent{}
// 挂载子组件
storeComponent := &StoreComponent{}
skuComponent := &SkuComponent{}
skuComponent.Mount(
&PromotionComponent{},
&AftersaleComponent{},
)
storeComponent.Mount(
skuComponent,
&ExpressComponent{},
)
// ---挂载组件---
// 普通组件
checkoutPage.Mount(
storeComponent,
&OrderComponent{},
)
// 并发组件
checkoutPage.MountConcurrency(
&AddressComponent{},
&PayMethodComponent{},
&InvoiceComponent{},
&CouponComponent{},
&GiftCardComponent{},
)
// 初始化业务上下文 并设置超时时间
ctx := GetContext(5 * time.Second)
defer ctx.CancelFunc()
// 开始构建页面组件数据
checkoutPage.ChildsDo(ctx)
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU() - 1)
DemoConcurrency(
}
// 获取正在运行的函数名
func runFuncName() string {
pc := make([]uintptr, 1)
runtime.Callers(2, pc)
f := runtime.FuncForPC(pc[0])
return f.Name()
return ""
}
代码运行结果:
Running] go run "../easy-tips/go/patterns/composite/concurrency/composite-concurrency.go"
main.(*StoreComponent).BusinessLogicDo 店铺组件...
main.(*SkuComponent).BusinessLogicDo 商品组件...
main.(*PromotionComponent).BusinessLogicDo 优惠信息组件...
main.(*AftersaleComponent).BusinessLogicDo 售后组件...
main.(*ExpressComponent).BusinessLogicDo 物流组件...
main.(*OrderComponent).BusinessLogicDo 订单金额详细信息组件...
main.(*PayMethodComponent).BusinessLogicDo 支付方式组件...
main.(*PayMethodComponent).BusinessLogicDo 获取支付方式 ing...
main.(*InvoiceComponent).BusinessLogicDo 发票组件...
main.(*InvoiceComponent).BusinessLogicDo 获取发票信息 ing...
main.(*GiftCardComponent).BusinessLogicDo 礼品卡组件...
main.(*GiftCardComponent).BusinessLogicDo 获取礼品卡信息 ing...
main.(*CouponComponent).BusinessLogicDo 优惠券组件...
main.(*CouponComponent).BusinessLogicDo 获取发票信息 ing...
main.(*AddressComponent).BusinessLogicDo 地址组件...
main.(*AddressComponent).BusinessLogicDo 获取地址信息 ing...
main.(*InvoiceComponent).BusinessLogicDo 获取发票信息 done...
main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
main.(*PayMethodComponent).BusinessLogicDo 获取支付方式 done...
main.(*AddressComponent).BusinessLogicDo 获取地址信息 done...
main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
main.(*CouponComponent).BusinessLogicDo 获取发票信息 done...
main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
main.(*GiftCardComponent).BusinessLogicDo 获取礼品卡信息 done...
main.(*BaseConcurrencyComponent).Do bc.BusinessLogicDo wait.done...
基准测试代码:
func Benchmark_Normal(b *testing.B) {
b.SetParallelism(runtime.NumCPU())
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
normal.Demo()
}
})
}
func Benchmark_Concurrency(b *testing.B) {
b.SetParallelism(runtime.NumCPU())
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
concurrency.Demo()
}
})
}
本地机器 Benchmark 对比测试结果:
go test -benchmem -run=^$ easy-tips/go/patterns/composite -bench . -v -count=1 --benchtime 20s
goos: darwin
goarch: amd64
pkg: easy-tips/go/patterns/composite
Benchmark_Normal-4 376 56666895 ns/op 35339 B/op 286 allocs/op
Benchmark_Concurrency-4 715 32669301 ns/op 36445 B/op 299 allocs/op
从上面的基准测试结果可以看出来Benchmark_Concurrency-4
平均每次的执行时间是32669301 ns
是要优于Benchmark_Normal
的56666895 ns
。
「并发组合模式」是一个由特定的设计模式结合 Go 语言天生的并发特性,通过适当封装形成的“新模式”。
特别说明:
本系列的一些设计模式的概念可能和原概念存在差异,因为会结合实际使用,取其精华,适当改变,灵活使用。
获取更多原创技术文章:
1
fo0o7hU2tr6v6TCe 2020-11-05 16:15:30 +08:00 1
感觉很牛逼,收藏了
借个楼,求大佬们多推荐推荐这些文章 |
2
wysnylc 2020-11-05 16:24:52 +08:00 1
总结:乱造新名词,本质上还是响应式编程,并行编程,Future 模式那一套
|
3
PiersSoCool 2020-11-05 16:29:55 +08:00 1
我没仔细看,不就是用 wait.group 进行同步么?
|
4
jiazhoulvke 2020-11-05 17:45:59 +08:00 1
学习了,感谢分享
|
5
lbp0200 2020-11-05 18:18:43 +08:00 1
脑子笨,看见设计模式,就想逃
|
6
TIGERB OP @PiersSoCool 是的 和 组合模式结合了下 进行了一次封装
|
8
TIGERB OP @jiazhoulvke 互相学习~
|