1. 交易的签名
  2. 理解收据receipt
  3. 理解区块
  4. 理解交易
  5. blockchain核心
  6. 布隆过滤器原理
  7. forkId 解读
  8. oracle 原理和实现
  9. 交易池分析
  10. TxList 解读
  11. MPT树
  12. 区块同步
  13. geth源码学习——介绍
  14. How Geth starts its server

博主的朋友写的

  • 看到core\blockchain.go的时候大量涉及该部分知识,故在此参考大佬博客加之自己的理解先行总结
  • 本文仅仅是简单总结了一下文件结构和重要函数功能,详细函数分析请参考downloader 同步

文件结构

downloader 模块的代码位于 eth/downloader 目录下。主要的功能代码分别是:

  • downloader.go :实现了区块同步逻辑

  • peer.go :对区块各个阶段的组装,下面的各个FetchXXX 就是很依赖这个模块。

  • queue.go :对eth/peer.go的封装

  • statesync.go :同步state对象

  • 注意:

    • downloader是一个下载器,从远程网络节点中获取 hashes 和 blocks。

    • fetcher则收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。

    • peers是经过验证可信任的通信节点的集合。

    • queue represents hashes that are either need fetching or are being fetched

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      // queue represents hashes that are either need fetching or are being fetched
      type queue struct {
      mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching

      // Headers are "special", they download in batches, supported by a skeleton chain
      headerHead common.Hash // Hash of the last queued header to verify order
      headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
      headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
      headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
      headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
      headerResults []*types.Header // Result cache accumulating the completed headers
      headerProced int // Number of headers already processed from the results
      headerOffset uint64 // Number of the first header in the result cache
      headerContCh chan bool // Channel to notify when header download finishes

      // All data retrievals below are based on an already assembles header chain
      blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
      blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
      blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations

      receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
      receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
      receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations

      resultCache *resultStore // Downloaded but not yet delivered fetch results
      resultSize common.StorageSize // Approximate size of a block (exponential moving average)

      lock *sync.RWMutex
      active *sync.Cond
      closed bool

      lastStatLog time.Time
      }

同步模式

以太坊中区块同步包含以下三种模式:

  • full sync:

    full 模式会在数据库中保存所有区块数据,同步时从远程节点同步 header 和 body 数据,而 state 和 receipt 数据则是在本地计算出来的。

    在 full 模式下,downloader 会同步区块的 header 和 body 数据组成一个区块,然后通过 blockchain 模块的 BlockChain.InsertChain 向数据库中插入区块。在 BlockChain.InsertChain 中,会逐个计算和验证每个块的 staterecepit 等数据,如果一切正常就将区块数据以及自己计算得到的 staterecepit 数据一起写入到数据库中。

  • fast sync:

    fast 模式下,recepit 不再由本地计算,而是和区块数据一样,直接由 downloader 从其它节点中同步;state 数据并不会全部计算和下载,而是选一个较新的区块(称之为 pivot)的 state 进行下载,以这个区块为分界,之前的区块是没有 state 数据的,之后的区块会像 full 模式下一样在本地计算 state。因此在 fast 模式下,同步的数据除了 header 和 body,还有 receipt,以及 pivot 区块的 state

    因此 fast 模式忽略了大部分 state 数据,并且使用网络直接同步 receipt 数据的方式替换了 full 模式下的本地计算,所以比较快。

  • light sync:从网络中同步所有区块头,不去同步区块体,也不去同步状态数据,仅在需要相应区块和状态数据时从网络上获取

简单总结:

SyncMode:

  • FullSync:从完整区块同步整个区块链历史
  • FastSync:快速下载 Header,仅在链头处完全同步
  • LightSync:仅下载 Header,然后终止

区块下载

区块下载流程示意图如下所示:

image-20220331155831706

首先根据 Synchronise 开始区块同步,通过 findAncestor 找到指定节点的共同祖先,并在此高度进行同步,同时开启多个 goroutine 同步不同的数据:header、receipt、body,假如同步高度为 100 的区块,必须先 header 同步成功同步完成才可以进行 body 和 receipts 的同步,而每个部分的同步大致都是由 FetchParts 来完成的,里面包含了各个 Chan 的配合,也会涉及不少的回调函数

源码分析

数据结构

downloader 数据结构如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
type Downloader struct {

mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events

checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)
genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler(调度程序)for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed

stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks

// Statistics 统计信息,
syncStatsChainOrigin uint64 // Origin block number where syncing started at
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsState stateSyncStats
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields

lightchain LightChain
blockchain BlockChain

// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving

// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
committed int32
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.

// Channels 这些通道很重要
headerCh chan dataPack // Channel receiving inbound block headers header的输入通道,从网络下载的header会被送到这个通道
bodyCh chan dataPack // Channel receiving inbound block bodies bodies的输入通道,从网络下载的bodies会被送到这个通道
receiptCh chan dataPack // Channel receiving inbound receipts receipts的输入通道,从网络下载的receipts会被送到这个通道
bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks 用来传输body fetcher新任务的通道
receiptWakeCh chan bool // Channel to signal the receipt fetcher of new tasks 用来传输receipt fetcher 新任务的通道
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks 通道为header处理者提供新的任务

// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates

snapSync bool // Whether to run state sync over the snap protocol
SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
stateSyncStart chan *stateSync //启动新的state fetcher
trackStateReq chan *stateReq
stateCh chan dataPack // Channel receiving inbound node state data State的输入通道,从网络下载的State会被送到这个通道

// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.

quitCh chan struct{} // Quit channel to signal termination
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

构造方法

New 用于初始化一个 Downloader 对象,具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
syncStatsState: stateSyncStats{
processed: rawdb.ReadFastTrieProgress(stateDb),
},
trackStateReq: make(chan *stateReq),
}
go dl.qosTuner() //计算rttEstimate和rttConfidence
go dl.stateFetcher() //启动stateFetcher的任务监听
return dl
}

同步下载

区块同步始于 Synchronise 函数,在这里会直接调用 synchronise 进行同步,如果同步过程中出现错误,则删除掉 Peer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)

switch err {
case nil, errBusy, errCanceled:
return err
}
if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
//删除操作
d.dropPeer(id)
}
return err
}
log.Warn("Synchronisation failed, retrying", "err", err)
return err
}

synchronise 函数实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
// Mock out the synchronisation if testing
if d.synchroniseMock != nil {
return d.synchroniseMock(id, hash)
}
// Make sure only one goroutine is ever allowed past this point at once // 只能运行一个, 检查是否正在运行
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)

// Post a user notification of the sync (only once per session) // 发布同步的用户通知(每个会话仅一次)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
// If we are already full syncing, but have a fast-sync bloom filter laying
// around, make sure it doesn't use memory any more. This is a special case
// when the user attempts to fast sync a new empty network.
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
// If snap sync was requested, create the snap scheduler and switch to fast
// sync mode. Long term we could drop fast sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
if mode == SnapSync {
if !d.snapSync {
log.Warn("Enabling snapshot sync prototype")
d.snapSync = true
}
mode = FastSync
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) // 重置queue的状态
d.peers.Reset() // 重置peer的状态

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { // 清空d.bodyWakeCh, d.receiptWakeCh
select {
case <-ch:
default:
}
}
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { //清空d.headerCh, d.bodyCh, d.receiptCh
for empty := false; !empty; {
select {
case <-ch:
default:
empty = true
}
}
}
for empty := false; !empty; { // 清空headerProcCh
select {
case <-d.headerProcCh:
default:
empty = true
}
}
// Create cancel channel for aborting mid-flight and mark the master peer
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
d.cancelPeer = id
d.cancelLock.Unlock()

defer d.Cancel() // No matter what, we can't leave the cancel channel open

// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))

// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
}
return d.syncWithPeer(p, hash, td) // 基于哈希链从指定的peer和head hash开始块同步
}

syncWithPeer 函数代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// filedir:go-ethereum-1.10.2\eth\downloader\downloader.go  L448
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
latest := d.lightchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
}
}()
if p.version < 64 {
return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, 64)
}
mode := d.getMode()

log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
latest, pivot, err := d.fetchHead(p)
if err != nil {
return err
}
if mode == FastSync && pivot == nil {
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chian). In that case we won't really fast sync
// anyway, but still need a valid pivot block to avoid some code hitting
// nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()

origin, err := d.findAncestor(p, latest) // 通过findAncestor来获取共同祖先,以便找到一个开始同步的点
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = height
d.syncStatsLock.Unlock()

// Ensure our origin point is below any fast sync pivot point
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) { // 如果对端节点的height小于64,则共同祖先更新为0
origin = 0
} else { // 否则更新pivot为对端节点height-64
pivotNumber := pivot.Number.Uint64()
if pivotNumber <= origin { // 如果pivot小于共同祖先,则更新共同祖先为pivot的前一个
origin = pivotNumber - 1
}
// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
}
}
d.committed = 1
if mode == FastSync && pivot.Number.Uint64() != 0 {
d.committed = 0
}
if mode == FastSync {
// Set the ancient data limitation.
// If we are running fast sync, all block data older than ancientLimit will be
// written to the ancient store. More recent data will be written to the active
// database and will wait for the freezer to migrate.
//
// If there is a checkpoint available, then calculate the ancientLimit through
// that. Otherwise calculate the ancient limit through the advertised height
// of the remote peer.
//
// The reason for picking checkpoint first is that a malicious peer can give us
// a fake (very high) height, forcing the ancient limit to also be very high.
// The peer would start to feed us valid blocks until head, resulting in all of
// the blocks might be written into the ancient store. A following mini-reorg
// could cause issues.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.

// If a part of blockchain data has already been written into active store,
// disable the ancient style insertion explicitly.
if origin >= frozen && frozen != 0 {
d.ancientLimit = 0
log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
} else if d.ancientLimit > 0 {
log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
}
// Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen {
if err := d.lightchain.SetHead(origin + 1); err != nil {
return err
}
}
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode) // 更新queue的值从共同祖先+1开始,即从共同祖先开始sync区块
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, td) },
}
if mode == FastSync { //根据模式的不同,增加新的处理逻辑
d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()

fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
return d.spawnSync(fetchers)
}

spawnSync 会给每个 fetcher 启动一个 goroutine, 然后阻塞的检查 fetcher 是否出错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers []func() error) error {
errc := make(chan error, len(fetchers))
d.cancelWg.Add(len(fetchers))
for _, fn := range fetchers {
fn := fn
go func() { defer d.cancelWg.Done(); errc <- fn() }()
}
// Wait for the first error, then terminate the others.
var err error
for i := 0; i < len(fetchers); i++ {
if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil && err != errCanceled {
break
}
}
d.queue.Close()
d.Cancel()
return err
}

同步 State

state 即世界状态,其保存着所有账户的余额等信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// filedir: go-ethereum-1.10.2\eth\downloader\statesync.go
// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
for {
select {
case s := <-d.stateSyncStart:
for next := s; next != nil; {
next = d.runStateSync(next)
}
case <-d.stateCh:
// Ignore state responses while no sync is running.
case <-d.quitCh:
return
}
}
}

runStateSync 函数执行状态同步,直到它完成或请求切换到另一个根哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
var (
active = make(map[string]*stateReq) // Currently in-flight requests
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
log.Trace("State sync starting", "root", s.root)

defer func() {
// Cancel active request timers on exit. Also set peers to idle so they're
// available for the next sync.
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
}
}()
go s.run()
defer s.Cancel()

// Listen for peer departure events to cancel assigned tasks
peerDrop := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
defer peerSub.Unsubscribe()

for {
// Enable sending of the first buffered element if there is one.
var (
deliverReq *stateReq
deliverReqCh chan *stateReq
)
if len(finished) > 0 {
deliverReq = finished[0]
deliverReqCh = s.deliver
}

select {
// The stateSync lifecycle:
case next := <-d.stateSyncStart:
d.spindownStateSync(active, finished, timeout, peerDrop)
return next

case <-s.done:
d.spindownStateSync(active, finished, timeout, peerDrop)
return nil

// Send the next finished request to the current sync:
case deliverReqCh <- deliverReq:
// Shift out the first request, but also set the emptied slot to nil for GC
copy(finished, finished[1:])
finished[len(finished)-1] = nil
finished = finished[:len(finished)-1]

// Handle incoming state packs:
case pack := <-d.stateCh:
// Discard any data not requested (or previously timed out)
req := active[pack.PeerId()]
if req == nil {
log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.response = pack.(*statePack).states
req.delivered = time.Now()

finished = append(finished, req)
delete(active, pack.PeerId())

// Handle dropped peer connections:
case p := <-peerDrop:
// Skip if no request is currently pending
req := active[p.id]
if req == nil {
continue
}
// Finalize the request and queue up for processing
req.timer.Stop()
req.dropped = true
req.delivered = time.Now()

finished = append(finished, req)
delete(active, p.id)

// Handle timed-out requests:
case req := <-timeout:
// If the peer is already requesting something else, ignore the stale timeout.
// This can happen when the timeout and the delivery happens simultaneously,
// causing both pathways to trigger.
if active[req.peer.id] != req {
continue
}
req.delivered = time.Now()
// Move the timed out data back into the download queue
finished = append(finished, req)
delete(active, req.peer.id)

// Track outgoing state requests:
case req := <-d.trackStateReq:
// If an active request already exists for this peer, we have a problem. In
// theory the trie node schedule must never assign two requests to the same
// peer. In practice however, a peer might receive a request, disconnect and
// immediately reconnect before the previous times out. In this case the first
// request is never honored, alas we must not silently overwrite it, as that
// causes valid requests to go missing and sync to get stuck.
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Move the previous request to the finished set
old.timer.Stop()
old.dropped = true
old.delivered = time.Now()
finished = append(finished, old)
}
// Start a timer to notify the sync loop if the peer stalled.
req.timer = time.AfterFunc(req.timeout, func() {
timeout <- req
})
active[req.peer.id] = req
}
}
}

同步 Head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// fetchHead retrieves the head header and prior pivot block (if available) from a remote peer.
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
p.log.Debug("Retrieving remote chain head")
//获取mode值
mode := d.getMode()

// Request the advertised remote head block and wait for the response
latest, _ := p.peer.Head()
fetch := 1
if mode == FastSync {
fetch = 2 // head + pivot headers
}
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)

ttl := d.requestTTL()
timeout := time.After(ttl)
for {
select {
case <-d.cancelCh:
return nil, nil, errCanceled

case packet := <-d.headerCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
break
}
// Make sure the peer gave us at least one and at most the requested headers
headers := packet.(*headerPack).headers
if len(headers) == 0 || len(headers) > fetch {
return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
}
// The first header needs to be the head, validate against the checkpoint
// and request. If only 1 header was returned, make sure there's no pivot
// or there was not one requested.
head := headers[0]
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
}
if len(headers) == 1 {
if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
return head, nil, nil
}
// At this point we have 2 headers in total and the first is the
// validated head of the chian. Check the pivot number and return,
pivot := headers[1]
if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
}
return head, pivot, nil

case <-timeout:
p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
return nil, nil, errTimeout

case <-d.bodyCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
}
}

处理 Head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
rollbackErr error
mode = d.getMode()
)
defer func() {
if rollback > 0 {
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if mode != LightSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number()
}
if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
// We're already unwinding the stack, only print the error to make it more visible
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
curFastBlock, curBlock := common.Big0, common.Big0
if mode != LightSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number()
}
log.Warn("Rolled back chain segment",
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
}()
// Wait for batches of headers to process
gotHeaders := false

for {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled

case headers := <-d.headerProcCh:
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
}
}
// If no headers were retrieved at all, the peer violated its TD promise that it had a
// better chain compared to ours. The only exception is if its promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
return errStallingPeer
}
}
// If fast or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
// of delivering the post-pivot blocks that would flag the invalid content.
//
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if mode == FastSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
// Disable any rollback and return
rollback = 0
return nil
}
// Otherwise split the chunk of headers into batches and process them
gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
default:
}
// Select the next chunk of headers to import
limit := maxHeadersProcess
if limit > len(headers) {
limit = len(headers)
}
chunk := headers[:limit]

// In case of header only syncing, validate the chunk immediately
if mode == FastSync || mode == LightSync {
// If we're importing pure headers, verify based on their recentness
var pivot uint64

d.pivotLock.RLock()
if d.pivotHeader != nil {
pivot = d.pivotHeader.Number.Uint64()
}
d.pivotLock.RUnlock()

frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
rollbackErr = err

// If some headers were inserted, track them as uncertain
if (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
}
log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, track all headers within the alloted limits
if mode == FastSync {
head := chunk[len(chunk)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
rollback = 1
}
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]
origin += uint64(limit)
}
// Update the highest block number we know if a higher one is found.
d.syncStatsLock.Lock()
if d.syncStatsChainHeight < origin {
d.syncStatsChainHeight = origin - 1
}
d.syncStatsLock.Unlock()

// Signal the content downloaders of the availablility of new tasks
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
select {
case ch <- true:
default:
}
}
}
}
}

同步 Body

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
log.Debug("Downloading block bodies", "origin", from)

var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
}
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
)
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")

log.Debug("Block body download terminated", "err", err)
return err
}
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
dropMeter.Mark(int64(packet.Items()))
}
}()
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
if cancel == nil {
return errNoSyncActive
}
select {
case destCh <- packet:
return nil
case <-cancel:
return errNoSyncActive
}
}
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

update := make(chan struct{}, 1)

// Prepare the queue and fetch block parts until the block header fetcher's done
finished := false
for {
select {
case <-d.cancelCh:
return errCanceled

case packet := <-deliveryCh:
deliveryTime := time.Now()
// If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of data and check chain validity
accepted, err := deliver(packet)
if errors.Is(err, errInvalidChain) {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if !errors.Is(err, errStaleDelivery) {
setIdle(peer, accepted, deliveryTime)
}
// Issue a log to the user to see what's going on
switch {
case err == nil && packet.Items() == 0:
peer.log.Trace("Requested data not delivered", "type", kind)
case err == nil:
peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
default:
peer.log.Debug("Failed to deliver retrieved data", "type", kind, "err", err)
}
}
// Blocks assembled, try to update the progress
select {
case update <- struct{}{}:
default:
}

case cont := <-wakeCh:
// The header fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
}
// Headers arrive, try to update the progress
select {
case update <- struct{}{}:
default:
}

case <-ticker.C:
// Sanity check update the progress
select {
case update <- struct{}{}:
default:
}

case <-update:
// Short circuit if we lost all our peers
if d.peers.Len() == 0 {
return errNoPeers
}
// Check for fetch request timeouts and demote the responsible peers
for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
// out that sync wise we need to get rid of the peer.
//
// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
peer.log.Trace("Data delivery timed out", "type", kind)
setIdle(peer, 0, time.Now())
} else {
peer.log.Debug("Stalling delivery, dropping", "type", kind)

if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
} else {
d.dropPeer(pid)

// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := pid == d.cancelPeer
d.cancelLock.RUnlock()

if master {
d.cancel()
return errTimeout
}
}
}
}
}
// If there's nothing more to fetch, wait or terminate
if pending() == 0 {
if !inFlight() && finished {
log.Debug("Data fetching completed", "type", kind)
return nil
}
break
}
// Send a download request to all idle peers, until throttled
progressed, throttled, running := false, false, inFlight()
idles, total := idle()
pendCount := pending()
for _, peer := range idles {
// Short circuit if throttling activated
if throttled {
break
}
// Short circuit if there is no more available task.
if pendCount = pending(); pendCount == 0 {
break
}
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
request, progress, throttle := reserve(peer, capacity(peer))
if progress {
progressed = true
}
if throttle {
throttled = true
throttleCounter.Inc(1)
}
if request == nil {
continue
}
if request.From > 0 {
peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
} else {
peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
// means that we've double allocated a fetch task to a peer. If that is the
// case, the internal state of the downloader and the queue is very wrong so
// better hard crash and note the error instead of silently accumulating into
// a much bigger issue.
panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
}
running = true
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
return errPeersUnavailable
}
}
}
}

同步收据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
log.Debug("Downloading transaction receipts", "origin", from)

var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
}
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetReceiptsIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

log.Debug("Transaction receipt download terminated", "err", err)
return err
}
// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}
// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
dropMeter.Mark(int64(packet.Items()))
}
}()
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
if cancel == nil {
return errNoSyncActive
}
select {
case destCh <- packet:
return nil
case <-cancel:
return errNoSyncActive
}
}

Content

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
for {
results := d.queue.Results(true)
if len(results) == 0 {
return nil
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
if err := d.importBlockResults(results); err != nil {
return err
}
}
}
// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent() error {
// Start syncing state of the reported head block. This should get us most of
// the state of the pivot block.
d.pivotLock.RLock()
sync := d.syncState(d.pivotHeader.Root)
d.pivotLock.RUnlock()

defer func() {
// The `sync` object is replaced every time the pivot moves. We need to
// defer close the very last active one, hence the lazy evaluation vs.
// calling defer sync.Cancel() !!!
sync.Cancel()
}()

closeOnErr := func(s *stateSync) {
if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled && err != snap.ErrCancelled {
d.queue.Close() // wake up Results
}
}
go closeOnErr(sync)

// To cater for moving pivot points, track the pivot block and subsequently
// accumulated download results separately.
var (
oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot
)
for {
// Wait for the next batch of downloaded data to be available, and if the pivot
// block became stale, move the goalpost
results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
if len(results) == 0 {
// If pivot sync is done, stop
if oldPivot == nil {
return sync.Cancel()
}
// If sync failed, stop
select {
case <-d.cancelCh:
sync.Cancel()
return errCanceled
default:
}
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
d.pivotLock.RLock()
pivot := d.pivotHeader
d.pivotLock.RUnlock()

if oldPivot == nil {
if pivot.Root != sync.root {
sync.Cancel()
sync = d.syncState(pivot.Root)

go closeOnErr(sync)
}
} else {
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
}
// Split around the pivot block and process the two sides via fast/full sync
if atomic.LoadInt32(&d.committed) == 0 {
latest := results[len(results)-1].Header
// If the height is above the pivot block by 2 sets, it means the pivot
// become stale in the network and it was garbage collected, move to a
// new pivot.
//
// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
// need to be taken into account, otherwise we're detecting the pivot move
// late and will drop peers due to unavailable state!!!
if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted

d.pivotLock.Lock()
d.pivotHeader = pivot
d.pivotLock.Unlock()

// Write out the pivot into the database so a rollback beyond it will
// reenable fast sync
rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())
}
}
P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)
if err := d.commitFastSyncData(beforeP, sync); err != nil {
return err
}
if P != nil {
// If new pivot block found, cancel old state retrieval and restart
if oldPivot != P {
sync.Cancel()
sync = d.syncState(P.Header.Root)

go closeOnErr(sync)
oldPivot = P
}
// Wait for completion, occasionally checking for pivot staleness
select {
case <-sync.done:
if sync.err != nil {
return sync.err
}
if err := d.commitPivotBlock(P); err != nil {
return err
}
oldPivot = nil

case <-time.After(time.Second):
oldTail = afterP
continue
}
}
// Fast sync done, pivot commit done, full import
if err := d.importBlockResults(afterP); err != nil {
return err
}
}
}

小问题

问题: light 节点与 full 节点是如何交互的?
解答: 首先,light 节点会维护多个与 full 节点的 p2p 连接。然后,当 light 节点需要与 full 节点交互时,会将需要发送的请求放到一个请求队列中。light 节点会启动一个 goroutine 不断从请求队列里获取请求,然后从 p2p 节点列表里选一个当前最好用的节点,将请求发出去。请求得到的结果会发到本地数据库和缓存里。

作者:Ashton
链接:https://www.jianshu.com/p/b31c208acaaa
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

转载于以太坊区块同步

参考链接

https://www.jianshu.com/p/427fbc3a25f9

https://blog.csdn.net/pulong0748/article/details/111574388