个人笔记
SongPinru 的小仓库
工作流
其实工作流就是一个决策树,根据用户不同的行为和属性,执行不同的动作
type FlowElement struct {
Id int `json:"id"`
WorkFlowId int `json:"workFlowId"`
Type string `json:"type"`
Incomings []int `json:"incomings"`
Outgoings []int `json:"outgoings"`
Properties []rule.Property `json:"properties"`
Extensions map[string]interface{} `json:"extensions"`
}
const (
TYPE_RULE = "Rule"
TYPE_ACTION = "Action"
TYPE_TRIGGER = "Trigger"
TYPE_CASE = "Case"
TYPE_RULE_CASE = "RuleCase"
TYPE_TRIGGER_CASE = "TriggerCase"
TYPE_START = "Start"
TYPE_END = "End"
)
Line
所有的节点的key由paltId和elementID确定
//key的格式 redis
fht:workflow:platId:%s:element:id:%d:instanc
//value
FlowElement
为了方便我们理解,简化一下,把key用数字表示,把value用Type表示,来看看redis里存了什么
0 -> Start
1 -> Line
2 -> Action
3 -> Line
4 -> Trigger
5 -> Line
6 -> End
7 -> Line
8 -> TriggerCase
9 -> Line
10 -> Rule
11 -> Line
12 -> End
13 -> Line
14 -> RuleCase
15 -> Line
16 -> Action
17 -> Line
18 -> End
19 -> Line
20 -> TriggerCase
21 -> Line
22 -> End
现在我们模拟一下用户给这个公众号发送了一条包含开始的信息,工作流都做了什么
Http Server
- 用户发送消息(包含开始)给公众号
- 微信会把这个消息发送给Workflow(Callback)
- Workflow收到了用户给公众号发消息这个事件
- Workflow从redis里取出这个公众号(plat)的所有TriggerNode(就是上面列的Trigger+Start)
0 -> Start
4 -> Trigger
TriggerNode是workflow实际运行的节点类型,和redis里存的有点小出入
实际节点 redis存的
TriggerNode : Trigger or Start
ActionNode : Action
GatewayNode : Rule
CaseNode : TriggerCase or RuleCase
EndNode : End
SequenceLineNode: Line
-
过滤取出的节点(可执行 或 Type是Start,且 符合Rules)
可执行 是用Bitmap存储的一个标志,所有的节点(除了Line)都有个bitmap,index是openId(微信定义的发送消息的用户的ID)映射的唯一id,如果这个bit置为1,则这个Trigger是可执行的
Rule就是表示这个事件是哪一种,包含什么条件,比如微信事件就有很多种
这里满足Type=Start && rule=回复关键词 && 包含开始,所以只剩下了(0 -> Start)这个节点
这里Trigger默认值是不可执行,所以不满足条件,被过滤掉了
-
把过滤出来的节点包装成TriggerNode,然后获取下游节点
-
(1->Line),包装成SequenceLineNode,继续获取下游节点
-
(2->Action),包装成ActionNode,这里Action是给用户发送信息,就调用微信的接口,给用户发送一条消息,然后继续获取下游节点
-
(3->Line),包装成SequenceLineNode,继续获取下游节点
-
(4->Trigger),由于是Trigger,标记这个Trigger为可执行,并注册定时任务
-
本次请求结束
我们继续模拟一下用户再给公众号发送了1,工作流做了什么
- 用户发送消息(内容为1)给公众号
- 微信会把这个消息发送给Workflow(Callback)
- Workflow收到了用户给公众号发消息这个事件
- Workflow从redis里取出这个公众号(plat)的所有TriggerNode
0 -> Start
4 -> Trigger
- 过滤取出的节点
这里满足 可执行 && rule=回复关键词 ,所以只剩下了(4->trigger)这个节点
因为内容不包含开始,不符合Start的全部rule,所以只剩下了trigger
- 把过滤出来的节点包装成TriggerNode,标记这个Trigger为不可执行,然后获取下游节点
- (5->Line) (7->Line) (19->Line),包装成SequenceLineNode,判断是否符合条件,如果是继续获取下游节点
我们这里回复了1,所以(7->Line)符合条件
- (8->TriggerCase) ,包装成CaseNode,继续获取下游节点
- (9->Line),包装成SequenceLineNode,继续获取下游节点
- (10->Rule),由于是Rule,包装成GatewayNode,继续获取下游节点
- (11->Line) (13->Line) , 包装成SequenceLineNode,判断是否符合条件,如果是继续获取下游节点
假设我是扫码关注的,这里应该 (13->Line)符合条件
- (14->RuleCase),包装成CaseNode,继续获取下游节点
- (15->Line),包装成SequenceLineNode,继续获取下游节点
- (16->Action),包装成ActionNode,这里Action是给用户发送信息,就调用微信的接口,给用户发送一条消息,然后继续获取下游节点
- (17->Line),包装成SequenceLineNode,继续获取下游节点
- (18->End),包装成EndNode
- 本次请求结束
总结一下,各个Node都做了什么事:
- GatewayNode:获取所有下游的节点(因为他下游有很多分支),依次判断线是否符合条件,包装成SequenceLineNode
- TriggerNode:同GatewayNode,但是如果下游只有一条线(其实就是Start),直接包装成SequenceLineNode
- ActionNode:执行动作,获取下游的节点,包装成SequenceLineNode
- CaseNode:获取下游的节点,包装成SequenceLineNode
- EndNode:结束,什么都不做
- SequenceLineNode:比较特殊,他会匹配下游节点Type是什么,然后执行不同的逻辑,并把下游节点置为可执行
可以看到整个流程图被Trigger分割成了两个部分,每个部分对应一次http请求,通过不同http请求,就可以实现不同的处理逻辑,用户就能收到精确的响应。
定时任务模块
当执行到Trigger的时候,会注册一个定时任务(放在redis里,使用zset),时间到了就会执行这个任务,这个就是默认Trigger分支
由于在我们的假设中没有超时,这个任务发现标记为不可执行,放弃此次任务,就不会执行默认分支
自动伸缩
workflow是利用zookeeper实现的自动扩容缩容,是个无中心的分布式架构
server:一个实例,启动后会zk上面创建临时节点
Slot:zk上面的一个永久节点,数量固定(30个),节点存贮的值是一个serverIP
slot是对整个系统的所有任务做的逻辑上的分组,这里强制把task分成了30组,一个task的slot固定,但是slot分配到哪台节点是不固定的,一个slot只可以配分配到一个server,但是一个server可以拥有多个slot
workflow启动后做了什么?
先看一下没有服务启动时zookeeper上的状态
第一步,把自己的IP注册到ServerPath下(注意注册的是临时节点,如果server心跳断开,节点自动消失)
第二步,扫描SlotPath下的所有节点,并将节点内值为空的slot的值都置为自己
第三步,注册监听事件
监听事件:
- serverPath
- 所有节点变化(有没有节点增加或者减少)
- slotPath
- 监听slot数据变动,并过滤出节点值从有值变为空的事件
监听事件对应两种情况,增加server和减少server:
增加节点
- server2启动,在serverPath下也注册了一个临时节点
- server2扫描slotPath,把0个slot注册成自己(因为都被server1占了),然后也注册了监听事件
- server1接收到ServerPath下节点数量变化的通知
- server节点数量增加,server1计算自己最大可持有的slot数(slots/servers)x1.2
- 如果超过最大可持有slot数,释放多余的节点(即把多余节点的值置为空)
- server2和server1都收到了slotPath的变动通知(每个slot是单独的一个事件)
- server1持有的slot数大于(slots/servers),忽略分配
- server2持有的slot数小于(slots/servers),把空的slot都注册成自己
减少节点
假设有3个server节点,如下分布
-
server3因为网络或其他原因和zookeeper心跳断开,serverPath下对应节点消失
-
server1,server2接收到ServerPath下节点数量变化的通知
-
server1,server2扫描slotPath,值为server2的置为空(这里由活着的节点操作,可能有并发问题,sleep一会(随机时长)再操作)
-
server1,server2收到了slotPath的变动通知
-
server1,server2持有的slot数小于slots/servers,把空的slot注册成自己(也可能有并发问题,sleep随机时长)
-
当持有的节点数大于slots/servers时,就忽略分配,这样就可以均匀分配