View on GitHub

个人笔记

SongPinru 的小仓库

工作流

其实工作流就是一个决策树,根据用户不同的行为和属性,执行不同的动作

image-20220120010558456

type FlowElement struct {
   Id         int                    `json:"id"`
   WorkFlowId int                    `json:"workFlowId"`
   Type       string                 `json:"type"`
   Incomings  []int                  `json:"incomings"`
   Outgoings  []int                  `json:"outgoings"`
   Properties []rule.Property        `json:"properties"`
   Extensions map[string]interface{} `json:"extensions"`
}

const (
	TYPE_RULE = "Rule"

	TYPE_ACTION = "Action"

	TYPE_TRIGGER = "Trigger"

	TYPE_CASE = "Case"

	TYPE_RULE_CASE = "RuleCase"

	TYPE_TRIGGER_CASE = "TriggerCase"

	TYPE_START = "Start"

	TYPE_END = "End"
)
Line

所有的节点的key由paltId和elementID确定

//key的格式  redis
fht:workflow:platId:%s:element:id:%d:instanc
//value 
FlowElement

为了方便我们理解,简化一下,把key用数字表示,把value用Type表示,来看看redis里存了什么

0	->  Start
1	->  Line
2	->  Action
3	->  Line
4	->  Trigger
5	->  Line
6	->  End
7	->  Line
8	->  TriggerCase
9	->  Line
10	->  Rule
11	->  Line
12	->  End
13	->  Line
14	->  RuleCase
15	->  Line
16	->  Action
17	->  Line
18	->  End
19	->  Line
20	->  TriggerCase
21	->  Line
22	->  End

image-20220120021209805

现在我们模拟一下用户给这个公众号发送了一条包含开始的信息,工作流都做了什么

Http Server

  • 用户发送消息(包含开始)给公众号
  • 微信会把这个消息发送给Workflow(Callback)
  • Workflow收到了用户给公众号发消息这个事件
  • Workflow从redis里取出这个公众号(plat)的所有TriggerNode(就是上面列的Trigger+Start)
0	->  Start
4	->  Trigger

TriggerNode是workflow实际运行的节点类型,和redis里存的有点小出入

实际节点		 	  redis存的
TriggerNode		:	Trigger or Start
ActionNode		:	Action
GatewayNode		:	Rule
CaseNode		:	TriggerCase or RuleCase
EndNode			:	End
SequenceLineNode:	Line	
  • 过滤取出的节点(可执行 或 Type是Start,且 符合Rules)

    可执行 是用Bitmap存储的一个标志,所有的节点(除了Line)都有个bitmap,index是openId(微信定义的发送消息的用户的ID)映射的唯一id,如果这个bit置为1,则这个Trigger是可执行的

    Rule就是表示这个事件是哪一种,包含什么条件,比如微信事件就有很多种

image-20220120014512883

这里满足Type=Start && rule=回复关键词 && 包含开始,所以只剩下了(0 -> Start)这个节点

这里Trigger默认值是不可执行,所以不满足条件,被过滤掉了

  • 把过滤出来的节点包装成TriggerNode,然后获取下游节点

  • (1->Line),包装成SequenceLineNode,继续获取下游节点

  • (2->Action),包装成ActionNode,这里Action是给用户发送信息,就调用微信的接口,给用户发送一条消息,然后继续获取下游节点

  • (3->Line),包装成SequenceLineNode,继续获取下游节点

  • (4->Trigger),由于是Trigger,标记这个Trigger为可执行,并注册定时任务

  • 本次请求结束

我们继续模拟一下用户再给公众号发送了1,工作流做了什么

  • 用户发送消息(内容为1)给公众号
  • 微信会把这个消息发送给Workflow(Callback)
  • Workflow收到了用户给公众号发消息这个事件
  • Workflow从redis里取出这个公众号(plat)的所有TriggerNode
0	->  Start
4	->  Trigger
  • 过滤取出的节点

这里满足 可执行 && rule=回复关键词 ,所以只剩下了(4->trigger)这个节点

因为内容不包含开始,不符合Start的全部rule,所以只剩下了trigger

  • 把过滤出来的节点包装成TriggerNode,标记这个Trigger为不可执行,然后获取下游节点
  • (5->Line) (7->Line) (19->Line),包装成SequenceLineNode,判断是否符合条件,如果是继续获取下游节点

我们这里回复了1,所以(7->Line)符合条件

  • (8->TriggerCase) ,包装成CaseNode,继续获取下游节点
  • (9->Line),包装成SequenceLineNode,继续获取下游节点
  • (10->Rule),由于是Rule,包装成GatewayNode,继续获取下游节点
  • (11->Line) (13->Line) , 包装成SequenceLineNode,判断是否符合条件,如果是继续获取下游节点

假设我是扫码关注的,这里应该 (13->Line)符合条件

  • (14->RuleCase),包装成CaseNode,继续获取下游节点
  • (15->Line),包装成SequenceLineNode,继续获取下游节点
  • (16->Action),包装成ActionNode,这里Action是给用户发送信息,就调用微信的接口,给用户发送一条消息,然后继续获取下游节点
  • (17->Line),包装成SequenceLineNode,继续获取下游节点
  • (18->End),包装成EndNode
  • 本次请求结束

总结一下,各个Node都做了什么事:

  • GatewayNode:获取所有下游的节点(因为他下游有很多分支),依次判断线是否符合条件,包装成SequenceLineNode
  • TriggerNode:同GatewayNode,但是如果下游只有一条线(其实就是Start),直接包装成SequenceLineNode
  • ActionNode:执行动作,获取下游的节点,包装成SequenceLineNode
  • CaseNode:获取下游的节点,包装成SequenceLineNode
  • EndNode:结束,什么都不做
  • SequenceLineNode:比较特殊,他会匹配下游节点Type是什么,然后执行不同的逻辑,并把下游节点置为可执行

image-20220120111835833

可以看到整个流程图被Trigger分割成了两个部分,每个部分对应一次http请求,通过不同http请求,就可以实现不同的处理逻辑,用户就能收到精确的响应。

定时任务模块

当执行到Trigger的时候,会注册一个定时任务(放在redis里,使用zset),时间到了就会执行这个任务,这个就是默认Trigger分支

由于在我们的假设中没有超时,这个任务发现标记为不可执行,放弃此次任务,就不会执行默认分支


自动伸缩

workflow是利用zookeeper实现的自动扩容缩容,是个无中心的分布式架构

image-20220119220837753

server:一个实例,启动后会zk上面创建临时节点

Slot:zk上面的一个永久节点,数量固定(30个),节点存贮的值是一个serverIP

slot是对整个系统的所有任务做的逻辑上的分组,这里强制把task分成了30组,一个task的slot固定,但是slot分配到哪台节点是不固定的,一个slot只可以配分配到一个server,但是一个server可以拥有多个slot

workflow启动后做了什么?

先看一下没有服务启动时zookeeper上的状态

image-20220119222136257

第一步,把自己的IP注册到ServerPath下(注意注册的是临时节点,如果server心跳断开,节点自动消失)

image-20220119221434411

第二步,扫描SlotPath下的所有节点,并将节点内值为空的slot的值都置为自己

image-20220119222355474

第三步,注册监听事件

监听事件:

  • serverPath
    • 所有节点变化(有没有节点增加或者减少)
  • slotPath
    • 监听slot数据变动,并过滤出节点值从有值变为空的事件

监听事件对应两种情况,增加server和减少server:

增加节点

  • server2启动,在serverPath下也注册了一个临时节点
  • server2扫描slotPath,把0个slot注册成自己(因为都被server1占了),然后也注册了监听事件

image-20220119230552777

  • server1接收到ServerPath下节点数量变化的通知
  • server节点数量增加,server1计算自己最大可持有的slot数(slots/servers)x1.2
  • 如果超过最大可持有slot数,释放多余的节点(即把多余节点的值置为空)

image-20220119230036799

  • server2和server1都收到了slotPath的变动通知(每个slot是单独的一个事件)
  • server1持有的slot数大于(slots/servers),忽略分配
  • server2持有的slot数小于(slots/servers),把空的slot都注册成自己

image-20220119230302512

减少节点

假设有3个server节点,如下分布

image-20220119234028211

  • server3因为网络或其他原因和zookeeper心跳断开,serverPath下对应节点消失

  • server1,server2接收到ServerPath下节点数量变化的通知

  • server1,server2扫描slotPath,值为server2的置为空(这里由活着的节点操作,可能有并发问题,sleep一会(随机时长)再操作)

    image-20220119234404952

  • server1,server2收到了slotPath的变动通知

  • server1,server2持有的slot数小于slots/servers,把空的slot注册成自己(也可能有并发问题,sleep随机时长)

  • 当持有的节点数大于slots/servers时,就忽略分配,这样就可以均匀分配

image-20220119234543542