1. 币快讯首页
  2. 技术

精通Filecoin:Lotus真实数据处理之Client处理存储

精通Filecoin:Lotus真实数据处理之Client处理存储接上文:《精通 Filecoin:Lotus 真实数据处理之 Client 初始化

客户端发起交易之后,最终调用 Lotus(Client)API 的 ClientStartDeal 方法对交易进行处理。这个方法最终调用 Client 对象的 ProposeStorageDeal 方法处理交易:
result, err := a.SMDealClient.ProposeStorageDeal(
    ctx,
    params.Walle   &providerInfo,
    params.Data,    // 包含了客户端的数据
    dealStart,
    calcDealExpiration(params.MinBlocksDuration, md, dealStart),
    params.EpochPrice,
    big.Zero(),
    rt,
    params.FastRetrieval,
    params.VerifiedDeal,
)

Client 对象的 ProposeStorageDeal 方法处理如下:

  • 计算客户数据的 commP。
    commP, pieceSize, err := clientutils.CommP(ctx, c.pio, rt, >

  • 检查数据的大小。如果不符合,则直接返回。
  • 创建交易提案对象。
    dealProposal := market.DealProposal{
        PieceCID:             commP,
        PieceSize:            pieceSize.Padded(),
        Client:               addr,
        Provider:             info.Address,
        StartEpoch:           startEpoch,
        EndEpoch:             endEpoch,
        StoragePricePerEpoch: price,
        ProviderCollateral:   abi.NewTokenAmount(int64(pieceSize)), // TODO: real calc
        ClientCollateral:     big.Zero(),
        VerifiedDeal:         verifiedDeal,
    }

  • 对交易提案进行签名。
    clientDealProposal, err := c.node.SignProposal(ctx, addr, dealProposal)

  • 把交易提案转化为 ipld 节点对象。
    proposalNd, err := cborutil.AsIpld(clientDealProposal)

  • 创建客户端交易对象。
    deal := &storagemarket.ClientDeal{
        ProposalCid:        proposalNd.Cid(),
        ClientDealProposal: *clientDealProposal,
        State:              storagemarket.StorageDealUnknown,
        Miner:              info.PeerID,
        MinerWorker:        info.Worker,
        DataRef:            >

  • 调用 fsm 状态组的 Begin 的方法,生成一个状态机,并开始跟踪客户端交易对象。
    err = c.statemachines.Begin(proposalNd.Cid(), deal)

  • 向 fsm 状态组发送 ClientEventOpen 事件。
    err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventOpen)

    当状态机收到这个事件后,因为初始状态为默认值 0,即 StorageDealUnknown,事件处理器对象经过内部处理把状态从 StorageDealUnknown 修改为 StorageDealEnsureClientFunds,从而调用其处理函数 EnsureClientFunds 确定是否接收交易。

  • 返回结果。
    return &storagemarket.ProposeStorageDealResult{
            ProposalCid: deal.ProposalCid,
        }, c.discovery.AddPeer(>



    1、`EnsureClientFunds` 函数

    这个函数用来确认客户端有足够的资金来开始交易提案。

    这个函数的执行如下:

  • 获取客户适配器对象。
    node := environment.Node()

  • 获取区块链最顶端 tipset 对应的 tipset key 和 tipset 的高度。
    tok, _, err := node.GetChainHead(ctx.Context())

  • 确认客户端有足够多的资金来进行交易。
    mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Client, deal.Proposal.Client, deal.Proposal.ClientBalanceRequirement(), tok)

    EnsureFunds 方法在确认客户资金过程中,会调用 market Actor 对象的 AddBalance 方法进行处理。

  • 如果客户有足够多的资金,则调用 fsm 上下文对象的 Trigger 方法,通过事件处理器生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ClientEventFundsEnsured。
    if mcid == cid.Undef {
        return ctx.Trigger(storagemarket.ClientEventFundsEnsured)
    }

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealEnsureClientFunds 修改为 StorageDealFundsEnsured,从而调用其处理函数 ProposeDeal 处理提议的交易。

  • 2、`ProposeDeal` 函数

    这个函数处理提议的交易,把交易发送给 provider 对象。

  • 生成交易提议对象。
    proposal := network.Proposal{
        DealProposal:  &deal.ClientDealProposal,
        Piece:         deal.DataRef,
        FastRetrieval: deal.FastRetrieval,
    }

  • 创建一个到矿工的交易流。
    s, err := environment.NewDealStream(ctx.Context(), deal.Miner)

    客户环境对象的 NewDealStream 方法直接底层网络对象(libp2pStorageMarketNetwork)的同名方法进行处理。后者以指定的协议创建到指定矿工的流,并返回包装之后的流对象。包装之后的流对象是 dealStream 对象(storagemarket/network/deal_stream.go)。

    当存储矿工收到这个请求后,使用自身的 handleNewDealStream 方法处理处理这个流。在这个方法中对流进行简单包装,然后调用矿工对象的 HandleDealStream 方法处理客户交易。

  • 通过流把交易提案提交到矿工。
    err := s.WriteDealProposal(proposal);

    这个过程会把交易提案对象转化为 cbor 编码的对象。

  • 读取流返回的响应对象。
    resp, err := s.ReadDealResponse()

    等待矿工确定是否接收交易。结果是矿工签名的消息。内容如下:
    network.Response{
        State:    storagemarket.StorageDealWaitingForData,
        Proposal: deal.ProposalCid,
    }

  • 关闭流。
    err = s.Close()

  • 获取当前链顶部相关信息。
    tok, _, err := environment.Node().GetChainHead(ctx.Context())

  • 验证矿工返回的响应。
    err := clientutils.VerifyResponse(ctx.Context(), resp, deal.MinerWorker, tok, environment.Node().VerifySignature);

  • 如果响应状态不是等待数据,则发送 ClientEventUnexpectedDealState 事件。
    if resp.Response.State != storagemarket.StorageDealWaitingForData {
        return ctx.Trigger(storagemarket.ClientEventUnexpectedDealState, resp.Response.State, resp.Response.Message)
    }

  • 发送初始化数据传输事件。
    return ctx.Trigger(storagemarket.ClientEventInitiateDataTransfer)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealFundsEnsured 修改为 StorageDealStartDataTransfer,从而调用其处理函数 InitiateDataTransfer 处理提议的交易。

  • 3、`InitiateDataTransfer` 函数

    初始化数据传输。

  • 检查传输类型。
    if deal.DataRef.TransferType == storagemarket.TTManual {
        log.Infof(“manual >

  • 推送数据到 miner。
    err := environment.StartDataTransfer(ctx.Context(),
        deal.Miner,
        &requestvalidation.StorageDataTransferVoucher{Proposal: deal.ProposalCid},
        deal.DataRef.Root,
        shared.AllSelector(),
    )

    环境对象的 StartDataTransfer 方法内容如下:
    _, err := c.c.>

  • 如果有错误,则发送错误事件。
    if err != nil {
        return ctx.Trigger(storagemarket.ClientEventDataTransferFailed, xerrors.Errorf(“failed to open push >

  • 正常情况下,发送传输初始化完毕事件。
    return ctx.Trigger(storagemarket.ClientEventDataTransferInitiated)

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealStartDataTransfer 修改为 StorageDealTransferring,因为这个状态没有任何处理函数,所有直接返回。

    因为上面已经打开通道进行数据传输,所以状态机一直等待数据传输。在正常情况下,当数据传输完成之后,发送 ClientEventDataTransferComplete 事件。

    当状态机收到这个事件后,经过事件处理器把状态从 StorageDealTransferring 修改为 StorageDealCheckForAcceptance,从而调用其处理函数 CheckForDealAcceptance 处理交易。

  • 5、`CheckForDealAcceptance`

    这个函数用来检查矿工对数据验证的结果,并根据结果进行不同的。

  • 调用环境对象的 GetProviderDealState 方法,获取 provider 对交易的处理。
    dealState, err := environment.GetProviderDealState(ctx.Context(), deal.ProposalCid)

    环境对象的这个方法直接调用 Client 对象的 GetProviderDealState 方法进行处理。后者创建一个交易状态的流,然后向矿工进行请求,并处理结果。

    Provider 对象使用自身的 HandleDealStatusStream 方法对这个流进行处理。它会从状态机中获取交易对象,并把交易对象通过流返回到这里。

  • 如果交易对象状态为错误,则发送被拒事件。
    if isFailed(dealState.State) {
        return ctx.Trigger(storagemarket.ClientEventDealRejected, dealState.State, dealState.Message)
    }

  • 如果状态为已接收,则发送接收事件。
    if isAccepted(dealState.State) {
        if *dealState.ProposalCid != deal.ProposalCid {
            return ctx.Trigger(storagemarket.ClientEventResponseDealDidNotMatch, *dealState.ProposalCid, deal.ProposalCid)
        }
    return ctx.Trigger(storagemarket.ClientEventDealAccepted, dealState.PublishCid)

    }

    当状态机收到 ClientEventDealAccepted 这个事件后,经过事件处理器把状态从 StorageDealCheckForAcceptance 修改为 StorageDealProposalAccepted,从而调用其处理函数 ValidateDealPublished 处理交易。

  • 如果状态不是前面需要的状态,则一直等待矿工处理交易。
    return waitAgain(ctx, environment, false)

    waitAgain 函数内容如下:
    func waitAgain(ctx fsm.Context, environment ClientDealEnvironment, pollError bool) error {
        t := time.NewTimer(environment.PollingInterval())
    go func() {
        select {
        case <-t.C:
            _ = ctx.Trigger(storagemarket.ClientEventWaitForDealState, pollError)
        case <-ctx.Context().Done():
            t.Stop()
            return
        }
    }()

    return nil

    }

  • 6、`ValidateDealPublished` 函数

    这个函数用来确认交易信息已经被发布到链上。

  • 调用 Lotus Client 适配器验证交易已经被发布到链上。
    dealID, err := environment.Node().ValidatePublishedDeal(ctx.Context(), deal)

    if err != nil {
        return ctx.Trigger(storagemarket.ClientEventDealPublishFailed, err)
    }

    Lotus Client 适配器的这个方法等待交易信息被发布到链上,默认等待5个区块确认。

  • 发送交易已被发布到链上事件。
    return ctx.Trigger(storagemarket.ClientEventDealPublished, dealID)

    当状态机收到 ClientEventDealPublished 这个事件后,经过事件处理器把状态从 StorageDealProposalAccepted 修改为 StorageDealSealing,从而调用其处理函数 VerifyDealActivated 处理交易。在调用函数之前,使用指定的交易 ID 修改客户交易对象为参数指定的 dealID。

  • 7、`VerifyDealActivated` 函数

    这个函数用来确认交易被密封在某个扇区,并且是活跃的。

  • 生成一个回调函数。
    cb := func(err error) {
        if err != nil {
            _ = ctx.Trigger(storagemarket.ClientEventDealActivationFailed, err)
        } else {
            _ = ctx.Trigger(storagemarket.ClientEventDealActivated)
        }
    }

  • 调用 Lotus Client 适配器的 OnDealSectorCommitted 方法,确认交易被密封在某个扇区,并且是活跃的。
    if err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb); err != nil {
        return ctx.Trigger(storagemarket.ClientEventDealActivationFailed, err)
    }

    Lotus Client 适配器的 OnDealSectorCommitted 方法会调用状态管理器来获取扇区的相关状态,比如是否预提交上链,是否提交已经上链等。

    无论能否确认都会调用回调函数,如果确认是 Ok的,则发送 ClientEventDealActivated 事件,如果不 OK,则发送 ClientEventDealActivationFailed 事件。

    如果在确认过程中出现错误,则发送 ClientEventDealActivationFailed 事件。

    对于正常情况,当状态机收到 ClientEventDealActivated 这个事件后,经过事件处理器把状态从 StorageDealSealing 修改为 StorageDealActive,从而调用其处理函数 WaitForDealCompletion 处理交易。

  • 如果确认成功,则直接返回。
    return nil

  • 8、`WaitForDealCompletion` 函数

  • 获取 Lotus Client 适配器。
    node := environment.Node()

  • 生成失效回调函数。
    expiredCb := func(err error) {
        if err != nil {
            _ = ctx.Trigger(storagemarket.ClientEventDealCompletionFailed, xerrors.Errorf(“deal expiration err: %w”, err))
        } else {
            _ = ctx.Trigger(storagemarket.ClientEventDealExpired)
        }
    }

  • 生成惩罚回调函数。
    slashedCb := func(slashEpoch abi.ChainEpoch, err error) {
        if err != nil {
            _ = ctx.Trigger(storagemarket.ClientEventDealCompletionFailed, xerrors.Errorf(“deal slashing err: %w”, err))
        } else {
            _ = ctx.Trigger(storagemarket.ClientEventDealSlashed, slashEpoch)
        }
    }

  • 调用 Lotus Client 适配器的 OnDealExpiredOrSlashed 方法,当交易失效或惩罚时候通知客户端。
    if err := node.OnDealExpiredOrSlashed(ctx.Context(), deal.DealID, expiredCb, slashedCb); err != nil {
        return ctx.Trigger(storagemarket.ClientEventDealCompletionFailed, err)
    }

  • 返回。
    return nil

  • 免责声明:本文来自乔疯发布.观点仅代表作者本人,不代表币快讯赞同其观点或证实其描述,版权归原作者所有,转载请注明出处:https://www.bikuaixun.com.cn/22788.html

    温馨提示:投资有风险,入市须谨慎。本资讯不作为投资理财建议。