Fabric 源码分析
⼀、链码
1、开发模板
链码开发实现Chaincode接口,Chaincode接口里面只有两个方法,一个是Init()方
法(链码初始化),另一个是Invoke()方法(链码调用主要逻辑),Chaincode接口如
下:
1
2
3
4
5
6
/*
所有链码必须实现Chaincode接口的Init()和Invoke()方法,
并且fabric需要调用这两个方法来执行交易逻辑
*/
type Chaincode interface {
// Init is called during Instantiate transaction after the chaincode
container
// has been established for the first time, allowing the chaincode to
// initialize its internal data
/*
Init()方法在首次建立链码容器后负责实例化,并初始化数据
*/
Init(stub ChaincodeStubInterface) pb.Response
7
8
9
10
11
12
13 // Invoke is called to update or query the ledger in a proposal
14
transaction.
// Updated state variables are not committed to the ledger until the
// transaction is committed.
/*
一次交易提案会通过调用Invoke()方法执行,包括查询或者更新账本
交易成功提交后,更新的状态变量才会提交到账本中
*/
Invoke(stub ChaincodeStubInterface) pb.Response
}
在链码开发的过程中,主要涉及到ChaincodeStubInterface的以下方法,可以直接
使用,方法列表如下:
type ChaincodeStubInterface interface {
//获取参数,返回Invoke()方法名称,以及参数列表
GetFunctionAndParameters() (string, []string)
//获取TxID,也就是每次调用的提案ID
GetTxID() string
//获取KV键值Value
GetState(key string) ([]byte, error)
15
16
17
18
19
20
21
22
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//更新KV键值
PutState(key string, value []byte) error
//删除KV键值
DelState(key string) error
//根据字典序获取KV键值集合
GetStateByRange(startKey, endKey string) (StateQueryIteratorInterface,
error)
//获取联合索引KV键值
GetStateByPartialCompositeKey(objectType string, keys []string)
(StateQueryIteratorInterface, error)
//创建联合索引
CreateCompositeKey(objectType string, attributes []string) (string,
error)
//分割联合索引
SplitCompositeKey(compositeKey string) (string, []string, error)
//获取KV键值结合
GetQueryResult(query string) (StateQueryIteratorInterface, error)
//获取对应Key下的历史更新记录
GetHistoryForKey(key string) (HistoryQueryIteratorInterface, error)
}
2、启动流程
初始化:读取默认配置,创建到Peer节点的gRPC的连接;
创建有限状态机结构(FSM):FSM会根据收到的消息和当前的状态来触发状态转移,并
执⾏提前设置的操作;同时Peer节点也有类似的FSM结构来管理消息响应;
发送注册gRPC消息:利⽤创建好的gPRC连接向Peer发送⾃身的注册消息;
消息处理循环:注册成功之后,等待接收来⾃Peer的消息以及⾃身的状态迁移
(next St at e)消息;
3、链码容器与Peer节点通信过程
链码容器与Peer节点建立通信
链码容器向Peer节点发送⾃⼰的注册信息ChaincodeMessage_REGIST ER;
Peer节点接收到之后注册到本地的⼀个Handle结构,返回
ChaincodeMessage_REGIST ERED消息给链码容器,状态更新为est ablished;
链码容器收到之后,什么也不做,只把状态更新为est ablished;
Peer节点继续发送ChaincodeMessage_READY消息给链码容器,状态更新为READY;
链码容器收到之后,还是什么也不做,状态继续更新为READY;
链码容器初始化
Peer节点继续发送ChaincodeMessage_INIT 消息给链码容器;
链码容器收到之后,调⽤Handler.handleInit ()⽅法进⾏初始化。主要是初始化所需的
ChaincodeSt ub结构,调⽤链码代码中的Init ()⽅法;
链码容器初始化成功之后,返回ChaincodeMessage_COMPLET ED消息给Peer节点。
此时,链码容器进⼊可被调⽤(Invoke)状态;
链码容器的Invoke()方法被Peer节点调用
Peer节点如果要调⽤链码,发送ChaincodeMessage_T RANSACT ION消息给链码容
器;
链码容器收到,调⽤Invoke()⽅法,根据⽤户在Invoke()⽅法的具体实现逻辑,可以返
回如下⼏类的响应消息给Peer节点:
ChaincodeMessage_GET_HISTORY_FOR_KEY
ChaincodeMessage_GET_QUERY_RESULT
ChaincodeMessage_GET_STATE
ChaincodeMessage_GET_STATE_BY_RANGE
ChaincodeMessage_QUERY_STATE_CLOSE
ChaincodeMessage_QUERY_STATE_NEXT
ChaincodeMessage_INVOKE_CHAINCODE
Peer节点收到消息,进⾏处理,并返回ChaincodeMessage_RESPONSR消息给链码容
器;
链码容器收到消息,返回ChaincodeMessage_COMPLET E消息给Peer节点,代表链码
的⼀次Invoke()⽅法调⽤成功结束;
⼆、Endorser背书
1、Endorser背书流程
Peer节点执行背书的时候,会执行/core/endorser/endorser.go的
ProcessProposal()方法,具体流程如下:
检验提案消息的合法性,以及相关的权限;
调⽤Validat eProposalMessage()⽅法对提案格式进⾏检查
Channel头部格式:头部类型是否合法;
签名头格式: 是否包括了nonce和creat or数据;
签名域: creat or证书MSP是否合法,签名是否正确;
如果是系统链码调⽤,检查是否是允许从外部调⽤的三种SCC之⼀;
如果chainID不为空,获取对应chain的账本结构;检查T xID的唯⼀性,确保同⼀个交易
未曾提交到账本中;
对于⽤户链码调⽤,检查ACL:资源为Propose,默认策略是Channel/Writ ers;
模拟执⾏提案:启动链码容器,对世界状态的最新版本进⾏临时快照,基于临时快照执⾏
链码,将结果记录在读写集中;
如果chainID不为空,获取对应账本的交易模拟器(T xSimulat or)和历史查询器
(Hist oryQueryExecut or),这两个结构将会在执⾏链码时被使⽤;
如果chainID不为空,调⽤simulat orProposal()⽅法获取模拟执⾏的结果,检查返回的
响应Response状态,如果状态码不⼩于500则创建并返回⼀个失败的
ProposalResponse;
对提案内容和读写集合进⾏签名,并返回提案响应消息;
如果chainID不为空,调⽤endorserProposal()⽅法,利⽤ESCC,对之前得到的模拟执
⾏的结果进⾏背书;
检查模拟执⾏提案时调⽤simulat orProposal()⽅法返回的响应Response的状态码,如
果状态码不⼩于400(也就是在400与499之间),返回含有检查链码模拟执⾏结果状
态的ProposalResponse以及链码错误chaincodeError;
将调⽤endorserProposal()⽅法返回的response.Payload值覆盖掉之前调⽤
simulat orProposal()⽅法返回的ProposalResponse.Response.Payload值,因为原来
的ProposalResponse.Response.Payload值含有链码调⽤的结果;
2、交易模拟器执行流程
其中调用simulatorProposal()方法,模拟执行的流程如下
从提案结构的载荷(Payload)中提取ChaincodeInvovat ionSpec结构,其中包括了所调⽤链
码(包括系统链码和⽤户链码)的路径、名称和版本,以及调⽤时传⼊的参数列表;
检查ESCC和VSCC(这⼀步还未实现);
如果调⽤的是⽤户链码,检查提案中的实例化策略跟账本上记录的该链码的实例化策略是
否⼀致。账本上记录的该链码的实例化策略是在该链码被安装的时候就指定了的。这⼀步
是为了防⽌有⼈修改权限在其他通道⾮法实例化;
调⽤callChaincode()⽅法执⾏Proposal,返回Response和Event ;通过⼀层⼀层调⽤
Support Impl.Execut e()、Execut eChaincode()、Execut e()、ChaincodeSupport .Launch()
以及ChaincodeSupport .Execut e()⽅法,把交易模拟器和历史查询器通过上下⽂结构体传
⼊⽅法中,创建并启动链码容器,创建链码gRPC消息,发送消息给CC(Chaincode)容
器,执⾏相关合约,接收链码容器返回的响应(ChaincodeMessage结构)。在此过程
中,会将读写集记录到交易模拟器结构体中;
如果chainID不为空,调⽤Get T xSimulat orResult s()⽅法拿到执⾏结果
(T xSimulat orResult s结构),从中解析读写集数据;
返回链码标准数据结构(ChaincodeDefinit ion)、响应消息(ChaincodeMessage)、交易读
写集(PubSimulat ionResult s)、链码事件(ChaincodeEvent );
三、Orderer排序服务
1、启动过程
从本地配置⽂件和环境变量中读取配置信息,构建配置树结构
初始化⽇志
启动gorout ing,提供profile服务,默认6060端⼝
配置gRPC监听服务,创建gRPC服务连接,默认7050端⼝
初始化本地MSP组件
创建本地MSP签名者实体
创建账本管理器,⽤于注册Orderer节点上的所有通道(包括系统通道和应⽤通道)
创建操作账本的⼯⼚结构LedgerFact ory,LedgerFact ory负责执⾏存储区块、查询区
块、查询交易信息等操作;
如果是⾸次执⾏,初始化系统通道,将系统通道信息接⼊区块中;
初始化共识插件,共识插件负责跟后台的队列打交道;
⽀持内部实现的solo共识
⽀持Kafka共识
创建各个账本的管理器Manager对象,开启gorout ine在后台启动共识过程,这⾥会启
动本地所有账本的共识过程,包括系统账本和普通交易账本。启动⽅式是后台开启协
程,是为了防⽌阻塞mult iChain.Manager;
负责维护通道配置、通道账本等资源
创建Orderer排序服务器
将Orderer排序服务器注册到gRPC服务器上
启动gRPC服务器提供Orderer服务
2、共识插件启动过程(Kafka集群)
创建Producer结构
发送Connect 消息给Kafka,检测Kafka是否在运⾏
创建处理对应Kafka T opic下的Consumer结构
配置从指定part it ion读取消息的Part it ionConsumer结构
不断从Kafka中读取消息,处理消息
3、排序后消息的处理
当前收到消息数达到Bat chSize.MaxMessageCount 或消息尺⼨过⼤,或者超时时间达到
Bat chT imeout ,则发送分块消息T T C-X到Kafka;
Kafka集群维护多个t opic分区。Kafka通过共识算法来确保写⼊到分区后的消息的⼀致
性。即⼀旦写⼊分区,任何Orderer节点看到的都是相同的消息队列;
Orderer节点在启动后,默认对本地账本对应的Kafka分区数据进⾏监听,开启⼀个
Kafka Consumer端不断从Kafka拉取新的消息,并对消息进⾏处理。满⾜⼀定策略下还会将
消息打包为区块。⽐如收到分块消息T CC-X,或者配置交易的时候,Orderer节点会切分之前
从Kafka中收到的消息为区块,记录到本地账本结构中;
具体逻辑代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
for {
select {
//链故障,需要退出
case <-chain.haltChan:
//从Kafka中消费消息发生错误
case kafkaErr := <-chain.channelConsumer.Errors():
select {
//如果接收不到Kafka的错误信息,则关闭与Kafka的连接,稍后重连
case <-chain.errorChan:
default:
close(chain.errorChan)
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
}
//成功读取到Kafka消息,进行逻辑处理
case in, ok := <-chain.channelConsumer.Messages():
select {
case <-chain.errorChan:
chain.errorChan = make(chan struct{})
default:
}
switch msg.Type.(type) {
//Kafka连接消息,可以忽略
case *ab.KafkaMessage_Connect:
//TTC消息,打包现有的一批消息为区块
case *ab.KafkaMessage_TimeToCut:
//(需要核心处理)Fabric相关消息,包括配置更新,应用通道交易等
case *ab.KafkaMessage_Regular:
processRegular() //主要是读取消息
}
//定期发送TTC消息到Kafka
case <-timer:
}
}
4、对外的服务
Broadcast()接口负责接收Client发送过来的已经收集到背书策略的交易提案;
Deliver()接口负责将打包好的区块信息发送给订阅了该通道的组织(Organizition)
中的Peer Leader节点;
内部工作流程图如下:
四、交易流程
发送签名提案消息到Endorser背书节点请求处理:Client 构造签名提案信息
(SignedProposal类型),通过调⽤Endorser背书服务客户端的ProcessProposal()接
⼝,提交消息到Endorser背书节点,请求模拟执⾏交易提案并签名背书;
Endorser背书节点模拟执⾏交易提案并签名背书:
检查签名提案消息的格式合法性与签名有效性,包括通道头部、签名头部、签名域、交
易ID、消息扩展域的ChaincodeId属性与PayloadVisibilit y可⻅性模式等;
检查提案消息的创建者是否满⾜指定通道上的通道访问权限,
即/Channel/Applicat ion/Writ ers写权限;
检查并启动链码容器以模拟执⾏交易提案,并将模拟执⾏结果暂时保存在交易模拟器
中,等待排序共识与交易验证,⽽不是直接更新到账本中。其中,交易模拟执⾏结果采
⽤状态数据读写集(读数据的键和版本、写数据的键值)记录交易造成的状态变更结
果;
调⽤ESCC系统链码对该提案信息的模拟执⾏结果读写集等进⾏签名背书;
Endorser背书节点向客户端发送提案响应消息,并分发隐私数据明⽂。Endorser背书节点
基于背书信息、模拟执⾏结果等构造提案响应消息(ProposalResponse类型),并回复
给客户端。⽬前,模拟执⾏结果读写集包含公有数据(包括公共数据与隐私数据哈希值)
与私有数据(或隐私数据)。其中,公有数据交由Orderer节点进⾏排序出块,再提交到
账本区块数据⽂件,并⼴播到该通道上的所有节点。如果模拟执⾏结果中还存在有效的隐
私数据明⽂,则Endorser背书节点通过Gossip消息协议将隐私数据明⽂发送给通道内授权
的其他节点(由隐私数据集合配置的签名策略决定),交由t ransient 隐私数据存储对象暂
时保存到本地的t ransient 隐私数据库(LevelDB),并在提交账本时存储到隐私数据库
(LevelDB),同时清理t ransient 隐私数据库中的过期数据;
处理提案响应消息:Client 解析Endorser背书节点回复的提案响应消息,获取背书结果并
检查提案响应消息状态的合法性,以判断是否收集到了⾜够多的符合要求的背书签名信
息;
发送交易数据给Orderer服务节点请求排序:Client 收集到⾜够多的符合要求的Endorser背
书签名之后,基于模拟执⾏结果、背书签名等构造合法的签名交易信息(Envelope类
型),通过Broadcast ()服务接⼝将信息提交给Orderer节点,请求交易排序处理。其中,
配置交易信息是不需要经过Endorser背书节点处理的;
Orderer服务节点对交易进⾏排序并构造新区块:Orderer排序节点提供Solo类型、Kafka
类型等共识组件,对符合通道处理要求的合法交易信息(包括普通交易信息、配置交易信
息等)进⾏排序并达成⼀致观点,对⼀段时间内接收到的⼀批交易信息按照打包交易的出
块规则(出块周期时⻓、区块字节数限制、配置交易单独出块等)构造新区块,创建应⽤
通道或更新通道配置,同时提交账本;
Leader节点请求Orderer服务节点发送通道账本区块:Leader主节点通过Deliver()服务接⼝
代表组织从Orderer节点请求通道账本上所有的区块数据,并通过Gossip消息协议分发到
组织内的其他Peer节点。如果请求的区块数据不存在,则Orderer节点默认阻塞等待,直
到指定区块创建完成并提交账本,再将该区块发送给Leader主节点;
Commit t er记账节点验证交易并提交账本
检查交易消息格式的正确性、签名合法性、交易内容是否篡改、消息头部的合法性等;
调⽤VSCC系统链码,验证收集的签名背书结果是否符合指定的背书策略;
对模拟结果中公有数据(含有公共数据与隐私数据哈希值等的区块数据)的读写集执⾏
MVCC检查,针对单个键查询、键范围查询、隐私数据哈希值三种情况,检查读数据版
本与交易时的账本是否⼀致,即是否存在读写冲突,并将存在冲突的交易标记为⽆效交
易;
验证模拟结果中隐私数据的正确性,遍历区块中有效交易的隐私数据读写集哈希值,取
出对应交易的原始隐私数据读写集明⽂,重新计算其哈希值并对两者进⾏⽐较。如果完
全相同,则说明该交易的隐私数据是真实有效的;
数据写⼊
保存所有的区块数据(公共数据部分)到区块数据⽂件中;
保存所有的私有数据(t ransient 隐私数据库部分)读写集到隐私数据库中;
建⽴区块索引信息到区块索引数据库;
将最新的有效交易数据(包含公共数据读写集、隐私数据读写集、隐私数据读写集
哈希值)更新到状态数据库;
将区块中经过Endorser背书的有效交易数据同步到历史数据库;
清理t ransient 隐私数据库中的过期数据;
Leader主节点分发数据与状态同步:Leader主节点基于Gossip消息协议将区块数据分发到
组织内的其他节点上。同时,节点之间通过反熵算法等机制主动拉取缺失的数据(区块数
据与隐私数据)、节点身份信息等,以确保组织内所有节点上的账本数据等信息保持同
步;
其他Commit t er记账节点(除了Leader主节点)验证交易并提交账本(同上);
五、指令分析