1. 交易的签名
  2. 理解收据receipt
  3. 理解区块
  4. 理解交易
  5. blockchain核心
  6. 布隆过滤器原理
  7. forkId 解读
  8. oracle 原理和实现
  9. 交易池分析
  10. TxList 解读
  11. MPT树
  12. 区块同步
  13. geth源码学习——介绍
  14. How Geth starts its server

core\txpool.go

前言

  • 这篇文章是博主的朋友写的。
  • 在写这篇文章的时候,笔者已经完完全全看完该代码文件的每一行代码,但是由于代码量过于庞大,所以自然存在边看边忘的情况,所以在此写下这篇文章记录自己的理解以及收获,笔者能力有限,错误之处在所难免,还望包容
  • 由于该文件代码量过大,而且笔者还同时在看该仓库其他的代码文件,所以笔者决定分期写,一边阅读其他的源码文件,一边回顾txPool.go同时写下阅读笔记
  • 而且大部分的理解已经记录在源码文件中了,所以阅读源码中的笔记更能帮助理解

一些重要概念

  1. 可执行交易和非可执行交易可执行交易是指从交易池中择优选出的一部分交易可以被执行,打包到区块中。非可执行交易则相反,任何刚进入交易池的交易均属于非可执行状态,在某一个时刻才会提升为可执行状态。

  1. 本地交易 在交易池中将交易标记为 local 的有多种用途:
    1. 在本地磁盘存储已发送的交易。这样,本地交易不会丢失,重启节点时可以重新加载到交易池,实时广播出去。
    2. 可以作为外部程序和以太坊沟通的一个渠道。外部程序只需要监听文件内容变化,则可以获得交易清单。
    3. local交易可优先于 remote 交易。对交易量的限制等操作,不影响 local 下的账户和交易。

  1. nonce总结:
    1. 以太坊中有两种nonce,一种是在区块中的nonce,主要是调整挖矿难度;一种是每笔交易中nonce
    2. 每个外部账户(私钥控制的账户)都有一个nonce值,从 0 开始连续累加,每累加一次,代表一笔交易。
    3. 某一地址的某一交易的nonce值如果大于当前的nonce,该交易会被放到交易池的queue列表中,直到缺失的nonce被提交到交易池中。
    4. 地址的nonce值是一个连续的整数,起设计的主要目的是防止双花。
    5. 在发生一笔交易时,如果不指定nonce值时,节点会根据当前交易池的交易自动计算该笔交易的nonce。有可能会出现节点 A 和节点 B 计算的nonce值不一样的情况。

  1. 时隙 (slots) 和时段 (epochs)

    1. 信标链是以太坊 2.0 的心脏,它令以太坊系统在和谐与共识中有序运行。每个 slot 为 12 秒,每个 epoch 由 32 个 slots 组成,即 6.4 分钟。

    2. Epoch 0 中的前 32 个 slots,创世区块在 Slot 0 中产生

    3. 在每个 slot 中,在信标链和分片中都可能新增一个区块。我们可以想象,信标链和分片链有序且紧密地排列在一起,当系统在理想情况下运转时,每 12 秒就有一个信标(链)区块和 64 个分片区块产生。验证者大致按照这个时间同步。

    4. 我们可以将一个 slot 看作是区块生成时间,不同的是 slots 内可以没有区块。信标链和分片的创世区块都在 Slot 0 中产生。分片将在信标链 epoch 0 的下一个 epoch 中开始运作,但无论是分片链还是信标链,都有自己的 epoch 0,且包含其创世区块。


一些 go 语言的奇妙用法

  1. ...其实是 go 的一种语法糖。第一个用法主要是用于函数有多个不定参数的情况,可以接受多个不确定数量的参数。第二个用法是 slice 可以被打散进行传递。

    这个是一个关于该语法的一篇博客可以参考: Go 中…的用法


  1. Go语言等待组(sync.WaitGroup)对于我来说是一个几乎不曾见过的 go 语言的语法知识,所以在此进行查找记录一下,个人理解:该等待组在本 go 语言程序中的作用是调用wg.Wait()时阻塞使得等在组里面的所有的 go 协程都运行完毕,然后才恢复,也是一种同步携程的方法。具体请看这篇文章:sync 包——WaitGroup

关于交易中的nonce的深入剖析:

参考文章: 1. 一文讲清楚以太坊的 nonce 2. 以太坊交易中的 Nonce 详解


交易池源码解析 (core/txpool.go)

前提参数

我们可以通过源码看到前面定义了一大堆参数,初看时毫无头绪,但是等你将>这两千行代码完整的看完之时,你基本上就可以理解大部分参数的含义了

  • 以下的这些参数我还不理解,等会补充:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const (
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10

// txSlotSize is used to calculate how many data slots a single transaction
// takes up based on its size. The slots are used as DoS protection, ensuring
// that validating a new transaction remains a constant operation (in reality
// O(maxslots), where max slots are 4 currently).
txSlotSize = 32 * 1024//32KB

// txMaxSize is the maximum size a single transaction can have. This field has
// non-trivial consequences: larger transactions are significantly harder and
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4 * txSlotSize // 128KB
)
  • 一些错误处理的error变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
var (
// ErrAlreadyKnown is returned if the transactions is already contained
// within the pool.

//交易池中已经存在的 错误处理
ErrAlreadyKnown = errors.New("already known")

// ErrInvalidSender is returned if the transaction contains an invalid signature.
//无效签名的 错误处理
ErrInvalidSender = errors.New("invalid sender")

// ErrUnderpriced is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
//定价过低
ErrUnderpriced = errors.New("transaction underpriced")

// ErrTxPoolOverflow is returned if the transaction pool is full and can't accpet
// another remote transaction.
//交易池已经满了
ErrTxPoolOverflow = errors.New("txpool is full")

// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")

// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")

// ErrNegativeValue is a sanity error to ensure no one is able to specify a
// transaction with a negative value.
ErrNegativeValue = errors.New("negative value")

// ErrOversizedData is returned if the input data of a transaction is greater
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")
)
  • 度量参数(或者说是计数器)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
var (
// Metrics for the pending pool
//待处理池
pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil)
pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil)
pendingRateLimitMeter = metrics.NewRegisteredMeter("txpool/pending/ratelimit", nil) // Dropped due to rate limiting
pendingNofundsMeter = metrics.NewRegisteredMeter("txpool/pending/nofunds", nil) // Dropped due to out-of-funds

// Metrics for the queued pool

//排队池
queuedDiscardMeter = metrics.NewRegisteredMeter("txpool/queued/discard", nil)
queuedReplaceMeter = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
queuedNofundsMeter = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil) // Dropped due to out-of-funds
queuedEvictionMeter = metrics.NewRegisteredMeter("txpool/queued/eviction", nil) // Dropped due to lifetime

// General tx metrics
knownTxMeter = metrics.NewRegisteredMeter("txpool/known", nil)
validTxMeter = metrics.NewRegisteredMeter("txpool/valid", nil)
invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
overflowedTxMeter = metrics.NewRegisteredMeter("txpool/overflowed", nil)
// throttleTxMeter counts how many transactions are rejected due to too-many-changes between
// txpool reorgs.
throttleTxMeter = metrics.NewRegisteredMeter("txpool/throttle", nil)
// reorgDurationTimer measures how long time a txpool reorg takes.
reorgDurationTimer = metrics.NewRegisteredTimer("txpool/reorgtime", nil)
// dropBetweenReorgHistogram counts how many drops we experience between two reorg runs. It is expected
// that this number is pretty low, since txpool reorgs happen very frequently.
dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015))

pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)

reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
)

交易池配置

交易池配置不多,但每项配置均直接影响交易池对交易的处理行为。配置信息由 TxPoolConfig 所定义,相关的解释已经在源码中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// TxPoolConfig are the configuration parameters of the transaction pool.

//交易池配置(以太坊 geth 节点允许在启动节点时,通过参数修改以下配置)
type TxPoolConfig struct {
//定义了一组视为local交易的账户地址。
//任何来自此清单的交易默认均被视为 local 交易
Locals []common.Address // Addresses that should be treated by default as local

//是否禁止local交易处理。默认为 fasle,允许 local 交易。
//如果禁止,则来自 local 的交易均视为 remote 交易处理
NoLocals bool // Whether local transaction handling should be disabled

//存储local交易记录的文件名,默认是 ./transactions.rlp
Journal string // Journal of local transactions to survive node restarts

//定期将local交易存储文件中的时间间隔。默认为每小时一次
Rejournal time.Duration // Time interval to regenerate the local transaction journal

// remote交易进入交易池的最低 Price 要求。此设置对 local 交易无效。默认值1
PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool

//替换交易时所要求的价格上调涨幅比例最低要求。任何低于要求的替换交易均被拒绝。
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)

//当交易池中可执行交易(是已在等待矿工打包的交易)量超标时,允许每个账户可以保留在交易池最低交易数。默认值是 16 笔。
AccountSlots uint64 // Number of executable transaction slots guaranteed per account

//交易池中所允许的可执行交易量上限,高于上限时将释放部分交易。默认是 4096 笔交易。
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts

//交易池中单个账户 非可执行交易 上限,默认是64笔。
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account

//交易池中所有非可执行交易上限,默认 1024 笔。
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

// 允许 remote 的非可执行交易可在交易池存活的最长时间。
//交易池每分钟检查一次,一旦发现有超期的remote 账户,则移除该账户下的所有非可执行交易。默认为3小时。
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

以上有解释过的默认配置(如: PriceLimit=1 PriceBump=10)在下面已经被定义成常量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,

PriceLimit: 1,
PriceBump: 10,

AccountSlots: 16,
GlobalSlots: 4096 + 1024, // urgent + floating queue capacity with 4:1 ratio
AccountQueue: 64,
GlobalQueue: 1024,

//默认的remote的生命周期 交易池每隔一分钟检查一次
Lifetime: 3 * time.Hour,
}

该结构体被用于Txpool中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
//配置信息
config TxPoolConfig
//链配置
chainconfig *params.ChainConfig //param(中文->参数)
//当前链
chain blockChain
//最低的gas价格
gasPrice *big.Int
//通过txFedd订阅TxPool的信息
txFeed event.Feed
//提供了同时取消多个订阅的功能
scope event.SubscriptionScope

//对事物进行签名处理
signer types.Signer
//读写互斥锁
mu sync.RWMutex

istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
eip1559 bool // Fork indicator whether we are using EIP-1559 type transactions.

//区块链头部的当前状态
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *txNoncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

//可执行队列
pending map[common.Address]*txList // All currently processable transactions
//排队队列
queue map[common.Address]*txList // Queued but non-processable transactions
//每个账户对应的最后一笔交易进入的pending队列的时刻
beats map[common.Address]time.Time // Last heartbeat from each known account
//储存全部的交易
all *txLookup // All transactions to allow lookups
//所有按价格排序的交易
priced *txPricedList // All transactions sorted by price

//当有了新的区块的产生会收到消息,订阅区块头消息
chainHeadCh chan ChainHeadEvent
//区块头消息订阅器
chainHeadSub event.Subscription
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
}

就像上面的交易配置,再结合下面的图像:

我们可以发现,以太坊将交易按状态分为两部分:可执行交易和非可执行交易。分别记录在pending容器中和 queue容器,交易池先采用一个 txLookup (内部为 map)跟踪所有交易。同时将交易根据本地优先,价格优先原则将交易划分为两部分 queue pending。而这两部交易则按账户分别跟踪

Txpool初始化

**func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool **

1. 检查配置,配置有问题的话就用默认配置初始化

config = (&config).sanitize()


2. 初始化本地账户:

pool.locals = newAccountSet(pool.signer)

相关函数为详情为:

1
2
3
4
5
6
7
8
9
10
11
12
func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
as := &accountSet{
accounts: make(map[common.Address]struct{}),
signer: signer,
}
//逐个加入地址
for _, addr := range addrs {
as.add(addr)
}
//返回创建的AccountSet
return as
}

3. 将配置的本地账户地址加入进去

pool.locals.add(addr)

我们在安装以太坊客户端可以指定一个数据存储目录,此目录便会存储着所有我们导入的或者通过本地客户端创建的帐户 keystore 文件。而这个加载过程便是从该目录加载帐户数据


4. 更新交易池:

1
2
//reset检索区块链的当前状态,并确保交易池的内容相对于链状态有效。
pool.reset(nil, chain.CurrentBlock().Header())

更新交易池使用的reset函数非常关键,我们要进行讲解:

首先是reset函数的目的是:

  • 对应oldHead=nil的情况时:

    一般发生在刚创建交易池的时候,我们会用chain.CurrentBlock().Header()(就是当前的区块头)来进行替换,说是复制也可以,达到reset函数的目的;

  • 对应oldHead!=nil的情况时:
    发生原因: 由于以太坊是分布式系统,当本地节点已经挑选出最优的交易,准备广播给整个网络,这个时候矿工已经打包了一个区块,本地节点的区块头就是旧的了,本地筛选的交易有可能已经被打包,如果已经被打包生成了新区块,再将这个交易广播已经没有任何的意义,甚至我们费尽心思准备好的 pending 缓冲区里的交易都是无效的。
    解决方法: 为了避免以上的情况发生我们就需要监听链是否有新区块产生,也就是ChainHeadEvent事件(相关调用的函数为runReorg函数),监听到之后就要回退,现在这里不是我们这讨论的范畴;
    具体代码就是这样完成的:

    1
    2
    3
    // Subscribe events from blockchain and start the main event loop.
    //在交易池启动后,将订阅链的区块头事件
    pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
  • pool.wg.Add(1)关于这行代码的解释请看 go 语言语法知识go语言等待组那一块,这里不再讲解;

  • go pool.scheduleReorgLoop()文件给出的解释是这样的,与后面加载本地日志的操作相呼应 Start the reorg loop early so it can handle requests generated during journal loading.

  • 如果本地交易开启 那么从本地磁盘加载本地交易:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 // If local transactions and journaling is enabled, load from disk
//允许local交易 并且储存了journal
if !config.NoLocals && config.Journal != "" {
//在启动交易池时根据配置开启本地交易存储能力
//主要是从config中读出
pool.journal = newTxJournal(config.Journal)

//load 从磁盘加载已有交易到交易池中。
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
//对 journal 文件执行 rotate,将交易池中的本地交易写入journal文件,并丢弃旧数据。
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
  • 开启主循环
1
2
3
4
5
6
//启动事件循环并返回
pool.wg.Add(1)

//是txPool的一个goroutine.也是主要的事件循环.(下一个函数)
//等待和响应外部区块链事件以及各种报告和交易驱逐事件
go pool.loop()

到此初始化结束


添加交易到交易池

交易池中交易的来源一方面是其他节点广播过来的,一方面是本地提交的,追根到源代码一个是AddLocal(),一个是AddRemote(),不管哪个都会调用addTxs()。所以我们对添加交易的讨论就会从这个函数开始,然后逐步引出全局

先看下面这一张图简要说明一下操作的流程:


  1. 首先是遍历整个交易map(txs),将其中已经存在的和无效签名的交易过滤出去,注意同时其中要进行相关数据的记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
for i, tx := range txs {
// If the transaction is known, pre-set the error slot
//使用的Get()很简单就不多言
if pool.all.Get(tx.Hash()) != nil {
errs[i] = ErrAlreadyKnown
knownTxMeter.Mark(1)
continue
}
// Exclude transactions with invalid signatures as soon as
// possible and cache senders in transactions before
// obtaining lock
_, err := types.Sender(pool.signer, tx)
//错误签名
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
continue
}
// Accumulate all unknown transactions for deeper processing
news = append(news, tx)
}
//如果排查完之后news里面没有交易
if len(news) == 0 {
return errs
}

  1. 将交易进行添加的操作:
1
2
3
4
//上锁防止冲突
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
pool.mu.Unlock()

于是乎我们就必须进入addTxsLocked()函数中去了解其进行的操作,代码不长,于是全部放在下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs))
for i, tx := range txs {
replaced, err := pool.add(tx, local)
errs[i] = err
//加入交易池成功并且没有替换老交易
if err == nil && !replaced {
dirty.addTx(tx)
}
}
validTxMeter.Mark(int64(len(dirty.accounts)))
return errs, dirty
}

我们发现除了一些初始化和计数器的操作,还有一个至关重要的add()函数,它是将交易添加到queue中,等待后面的promote,到pending中去。如果在queue或者pending中已经存在,并且它的``gas price`更高时,将覆盖之前的交易。我们来了解一下该函数的具体操作步骤:

以下按照源码顺序编写,所以不贴源码了,请对照源码观看

  1. 过滤交易池中已有的交易,记住是通过hash值进行的判断,因为即便两笔交易nonce值一样,hash值也断然不会相同;

  2. 判断local标记,并进行共识性验证validateTx()

    validateTx: 主要做了以下几件事
    [ ]交易大小不能超过 32kb

    • 交易金额不能为负
    • 交易 gas 值不能超出当前交易池设定的 gaslimit
    • 交易签名必须正确
    • 如果交易为远程交易,则需验证其 gasprice 是否小于交易池 gasprice 最小值,如果是本地,优先打包,不管 gasprice
    • 判断当前交易 nonce 值是否过低
    • 交易所需花费的转帐手续费是否大于帐户余额 cost == V + GP * GL
    • 判断交易花费 gas 是否小于其预估花费 gas
  3. 如果交易池已满,丢弃价格过低的交易,注意这边的GlobalSlotsGlobalQueue ,就是我们说的pendingqueue的最大容量,如果交易池的交易数超过两者之和,就要丢弃价格过低的交易。

  4. 进入一个重要的if语句进行判断:

    • 判断当前交易在pending队列中是否存在nonce值相同的交易。存在则判断当前交易所设置的gasprice是否超过设置的PriceBump(为10)百分比,超过则替换覆盖已存在的交易old==nilnil,否则报错返回false和错误信息替换交易gasprice过低;无论如何返回,该函数都已经在此处退出。
    • 不存在的话就把它扔到queue队列中(通过enqueueTx()函数)。
  5. 对该笔交易进行一些local的操作。

  6. 交易晋升

    接着我们还是回到addTxs()这个函数中来,我们发现我们又是遇上一个极为重要的函数 requestPromoteExecutables(),下面进行相关的剖析: