1. 交易的签名
  2. 理解收据receipt
  3. 理解区块
  4. 理解交易
  5. blockchain核心
  6. 布隆过滤器原理
  7. forkId 解读
  8. TxList 解读
  9. oracle 原理和实现
  10. 交易池分析
  11. MPT树
  12. 区块同步
  13. geth源码学习——介绍
  14. How Geth starts its server

介绍

布隆过滤器(英语:Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

可见,它解决的核心问题是 检索一个元素是否在一个集合中。原理大致如下:

当一个元素被加入集合时,通过 K 个散列函数将这个元素映射成一个二进制数组中的 K 个位置,把这些位置的值设置为 1。检索时,我们只要观察这些对应的位置的值是不是都是 1 就(大约)知道集合中有没有检索的元素:如果这 K 个位置中有任何一个 0,则被检索元素一定不在集合中;如果都是 1,则被检元素很可能在。这就是布隆过滤器的基本思想。有个在线网站可以玩耍。

img

以太坊如何使用布隆过滤器

事件是以太坊给外部应用程序发送消息的重要方式,应用为了获取自己需要的信息,需要在许多事件中快速的检索。虽然把数据都写入存储,依靠存储中的哈希索引,可以快速检索,但是以太坊的存储空间的计算代价很高,不可能用来存储大量的交易日志、事件等重复性很高的信息。布隆过滤器就是用来解决快速检索的问题。

当生成区块时,布隆过滤器中包含触发事件的合约的地址、事件中的 indexed 的字段。然后,布隆过滤器会包含在区块头中,同时实际日志和事件的数据不包含在区块中,只保留了日志和事件的检索方式。

外部的应用程序监听事件时,可以快速扫描区块头中的布隆过滤器,找到特定合约地址和其中的 indexed 字段,查找满足条件的事件。

源码的实现与原理

core/types/bloom9.go

布隆过滤器的定义

1
2
3
4
5
6
7
8
9
10
const (
// BloomByteLength represents the number of bytes used in a header log bloom.
BloomByteLength = 256

// BloomBitLength represents the number of bits used in a header log bloom.
BloomBitLength = 8 * BloomByteLength //2048 位
)

// Bloom represents a 2048 bit bloom filter.
type Bloom [BloomByteLength]byte

可见,2048 位作为一个区块头的布隆过滤器。

添加元素

字节数组转换成布隆过滤器,从末尾开始数,替换布隆过滤器的 d 个字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// BytesToBloom converts a byte slice to a bloom filter.
// It panics if b is not of suitable size.
func BytesToBloom(b []byte) Bloom {
var bloom Bloom
bloom.SetBytes(b)
return bloom
}
// SetBytes sets the content of b to the given bytes.
// It panics if d is not of suitable size.
func (b *Bloom) SetBytes(d []byte) {
if len(b) < len(d) {
panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d)))
}
copy(b[BloomByteLength-len(d):], d)
}

实际上,除了上面提到的替代字节的方法,也有真的类似于 append 的方法,叫做 Add,它实际上是选择 2048 位中的三个位置,置为 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
// Add adds d to the filter. Future calls of Test(d) will return true.
func (b *Bloom) Add(d []byte) {
b.add(d, make([]byte, 6))
}

// add is internal version of Add, which takes a scratch buffer for reuse (needs to be at least 6 bytes)
func (b *Bloom) add(d []byte, buf []byte) {
i1, v1, i2, v2, i3, v3 := bloomValues(d, buf)
b[i1] |= v1
b[i2] |= v2
b[i3] |= v3
}

上面出现了比较重要的函数 bloomValues,他会选出 3 个字节,设置它们的值。

首先,选取索引为 1,3,5 的位置的字节,然后与 0000 0111 相与。假如索引为 1 的字节为 10110101,那么 1011 0101 & 0000 0111=0000 0101 也就是选择后三位的值。然后将 1 移 5 位,也即为 0010 0000。这样就将某个字节的 8 位中的某一位设置成 1。

接着来选择这个字节所在的位置,将哈希值的末尾和 0111 1111 1111 相与,然后按照大端的方式,将这末尾的 16 位 转换成 uint16 的值,接着向做左移动 3 位,这样 最大为 FF=255,这样恰好在布隆过滤器的长度范围内。

这样,通过字节内的移位和选择字节的位置,我们就巧妙地伪随机地将 2048 位中的三个 bit 设置为 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// bloomValues returns the bytes (index-value pairs) to set for the given data
func bloomValues(data []byte, hashbuf []byte) (uint, byte, uint, byte, uint, byte) {
sha := hasherPool.Get().(crypto.KeccakState) //选择 keccak-256 的哈希算法
//哈希函数的用法,重置缓冲区、写入需要哈希的数据、读取需要哈希的数据
sha.Reset()
sha.Write(data)
sha.Read(hashbuf)
hasherPool.Put(sha)
// The actual bits to flip
//选取索引为 1,3,5 的位置的字节,然后与0000 0111相与。
//假如索引为 1 的字节为 10110101,那么 1011 0101 & 0000 0111=0000 0101 也就是选择后三位的值。
//然后将 1 移 5位,也即为 0010 0000
v1 := byte(1 << (hashbuf[1] & 0x7))
v2 := byte(1 << (hashbuf[3] & 0x7))
v3 := byte(1 << (hashbuf[5] & 0x7))
// The indices for the bytes to OR in
//将哈希值的末尾和 0111 1111 1111 相与,然后按照大端的方式,将这末尾的 16 位 转换成 uint16 的值,
//接着向做左移动 3 位,这样 最大为FF=255,这样恰好在布隆过滤器的长度范围内。
i1 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf)&0x7ff)>>3) - 1
i2 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[2:])&0x7ff)>>3) - 1
i3 := BloomByteLength - uint((binary.BigEndian.Uint16(hashbuf[4:])&0x7ff)>>3) - 1

return i1, v1, i2, v2, i3, v3
}

检查元素

源码中设置了检查某个元素是否在布隆过滤器中的方法,简单的比较是否对应的字节内的序列相同。

1
2
3
4
5
6
7
// Test checks if the given topic is present in the bloom filter
func (b Bloom) Test(topic []byte) bool {
i1, v1, i2, v2, i3, v3 := bloomValues(topic, make([]byte, 6))
return v1 == v1&b[i1] &&
v2 == v2&b[i2] &&
v3 == v3&b[i3]
}

设置布隆过滤器

可以看出来,布隆过滤器以相同的方式,记录触发日志的合约的地址和日志数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// CreateBloom creates a bloom filter out of the give Receipts (+Logs)
func CreateBloom(receipts Receipts) Bloom {
buf := make([]byte, 6) //因为哈希后的数据只需要前面 6 个字节
var bin Bloom
for _, receipt := range receipts {
for _, log := range receipt.Logs {
bin.add(log.Address.Bytes(), buf) //添加日志地址
for _, b := range log.Topics {
bin.add(b[:], buf) //添加日志
}
}
}
return bin
}
// LogsBloom returns the bloom bytes for the given logs
func LogsBloom(logs []*Log) []byte {
buf := make([]byte, 6)
var bin Bloom
for _, log := range logs {
bin.add(log.Address.Bytes(), buf)
for _, b := range log.Topics {
bin.add(b[:], buf)
}
}
return bin[:]
}

core/bloombits/generator.go

geth 中布隆过滤器的实现分成了三个文件, generator 生成布隆过滤器,matcher 用来匹配查询操作,scheduler 用于调度对单个 bit 值检索进行。

Generator 的定义

首先需要注意:查询和聚合 bloom 的操作并不是以一个区块为最小单位,而是以 section 为最小单位,也就是 4096 个区块。然后,每个 section 的布隆过滤器会聚合在一起,构成 Generator 的结构:

1
2
3
4
5
6
7
8
9
10
// Generator takes a number of bloom filters and generates the rotated bloom bits
// to be used for batched filtering.
type Generator struct {
//转置后的 bloom 数据,实际上为 2048*(sections/8) 的矩阵
blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching
//段的个数,也是布隆过滤器的个数
sections uint // Number of sections to batch together
//当前批量处理中的下一个将处理的段,也就是下一个 bloom
nextSec uint // Next section to set when adding a bloom
}

这里有个需要注意的地方:blooms [types.BloomBitLength][]byte 究竟是什么。它是多个 bloom 聚合在一起的集合,但是他的组织形式做了调整。这里,许多博客都有错误,比较让人误解。

在布隆过滤器的定义中,我们知道只有 3 个位置置为 1,如果挨个区块的检索,那么不断切换区块再通过哈希计算得到这三个位置,效率不高。因此,多个 bloom 聚合的矩阵进行了转置。这样检索的时候首先计算三个哈希后的位置 i, j, k,然后直接检索 4096 个区块的中i 位置是否为 1,进行第一轮排除,其他依次进行。

例如:

1
2
3
4
5
6
7
8
9
10
原矩阵:
[A0, A1, ..., A2047]
[B0, B1, ..., B2047]
...
[H0, H1, ..., H2047]
转置后:
[A0, B0, ..., H0]
[A1, B1, ..., H1]
...
[A2047, B2047, ..., H2047]

原矩阵中的 0 到 2048 是一个 section 的布隆过滤器,然后多行是多个 section 的布隆过滤器。转置之后,第一列是第一个 section 的布隆过滤器,第一行表示是矩阵中的所有 sections 的索引为 0 的比特向量。其他的以此类推。

sections 表示这个矩阵中有多少个 section,也就是多少个布隆过滤器。nextSec 表示下一个 section,也就是在添加 bloom 进矩阵时将要处理的 section。它相当于一个计数器,等于 sections 时表示完成了所有的添加操作。

新建 Generator

上面的理论介绍在代码中的实现略有技巧,首先转置后 blooms 有 2048 行是显然的,然后它的列的设置是以字节为单位的,一个字节有八位,可以存储 8 个 bloom 的比特向量在某个索引的值。因此,我们需要 sections 是 8 的倍数,恰好填满字节数组,同时字节数组的长度只要是 sections 的八分之一即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// NewGenerator creates a rotated bloom generator that can iteratively fill a
// batched bloom filter's bits.
func NewGenerator(sections uint) (*Generator, error) {
//段的数量需要是 8 的倍数,这样可以恰好按比特填充到字节数组里。
if sections%8 != 0 {
return nil, errors.New("section count not multiple of 8")
}
b := &Generator{sections: sections}
//请注意转置矩阵。这里因为一个字节占 8 位,而这里使用 byte 数组,因此只要占 1/8 的位置
for i := 0; i < types.BloomBitLength; i++ {
b.blooms[i] = make([]byte, sections/8)
}
return b, nil
}

添加 bloom 进 Generator

理解了 Generator 的数据结构后就轻松了,为了在字节数组中添加 bloom 的二进制序列,首先要找到在字节数组中的哪个索引,然后找到字节中需要置位的比特的索引。接着,八位一组的将对应位置设置为 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// AddBloom takes a single bloom filter and sets the corresponding bit column
// in memory accordingly.
func (b *Generator) AddBloom(index uint, bloom types.Bloom) error {
// Make sure we're not adding more bloom filters than our capacity
if b.nextSec >= b.sections {
//超过了设定的批量处理的段的个数
return errSectionOutOfBounds
}
//index 与下个将处理的段对应
if b.nextSec != index {
return errors.New("bloom filter with unexpected index")
}

// Rotate the bloom and insert into our collection
byteIndex := b.nextSec / 8 //根据下一个 section 的编号找到字节数组中的索引
//在一个字节中的比特的索引,注意一个字节中存有 8 sections 的
// bloom 的比特向量的某一位
bitIndex := byte(7 - b.nextSec%8)
//开始初始化 每一列对应的 bloom
for byt := 0; byt < types.BloomByteLength; byt++ {
//八位一组,大端序
bloomByte := bloom[types.BloomByteLength-1-byt]
if bloomByte == 0 {
continue
}
base := 8 * byt
b.blooms[base+7][byteIndex] |= ((bloomByte >> 7) & 1) << bitIndex
b.blooms[base+6][byteIndex] |= ((bloomByte >> 6) & 1) << bitIndex
b.blooms[base+5][byteIndex] |= ((bloomByte >> 5) & 1) << bitIndex
b.blooms[base+4][byteIndex] |= ((bloomByte >> 4) & 1) << bitIndex
b.blooms[base+3][byteIndex] |= ((bloomByte >> 3) & 1) << bitIndex
b.blooms[base+2][byteIndex] |= ((bloomByte >> 2) & 1) << bitIndex
b.blooms[base+1][byteIndex] |= ((bloomByte >> 1) & 1) << bitIndex
b.blooms[base][byteIndex] |= (bloomByte & 1) << bitIndex
}
b.nextSec++
return nil
}

最后,有一个检索的函数,用于返回 blooms 的某一行,注意这不是一个布隆过滤器,而是连续的若干个 section 对应的 bloom 的索引为 idx 构成的集合

1
2
3
4
5
6
7
8
9
10
11
12
// Bitset returns the bit vector belonging to the given bit index after all
// blooms have been added.
func (b *Generator) Bitset(idx uint) ([]byte, error) {
//因为 nextSec 递增,可以表示是否完成了给 Generator 的赋值
if b.nextSec != b.sections {
return nil, errors.New("bloom not fully generated yet")
}
if idx >= types.BloomBitLength {
return nil, errBloomBitOutOfBounds
}
return b.blooms[idx], nil
}

core/bloom bits/sheduler.go

sheduler 主要是用于调度检索任务,是检索任务的调度器,也可以删除重复数据、缓存结果,降低 IO 消耗。

数据结构定义

以太坊的布隆过滤器总共有 2048 位,以太坊会把若干个区块分成段,段作为检索的基本单位,4096 个区块为一段。checkpoint 和 时间检索也是这样。下面是一个检索请求,表示在特定的一段 section 中匹配 2048 位的过滤器中的哪一位。

1
2
3
4
5
6
// request represents a bloom retrieval task to prioritize and pull from the local
// database or remotely from the network.
type request struct {
section uint64 // Section index to retrieve the a bit-vector from
bit uint // Bit index within the section to retrieve the vector of
}

response 和上面的 request 对应,注意一段对应一个 response,检索任务以段(section) 为最小单位,而不是区块高度,表示被检索的 bit 向量的状态(即请求的状态)。cached 缓存检索结果,用于去重。done 表示请求是否完成。

1
2
3
4
5
// response represents the state of a requested bit-vector through a scheduler.
type response struct {
cached []byte // Cached bits to dedup multiple requests
done chan struct{} // Channel to allow waiting for completion
}

调度器的定义如下:scheduler 是查询某一段区块的布隆过滤器的 bit 向量中某一位的任务调度器。bit 表示查询 2048 位中哪一位;response 表示这个某个请求结构,一般而言会包含所在的一段,有 4096 个 请求结果 的键值对。在调度的同时,scheduler 会实现去重和缓存结果的功能。

1
2
3
4
5
6
7
8
9
10
// scheduler handles the scheduling of bloom-filter retrieval operations for
// entire section-batches belonging to a single bloom bit. Beside scheduling the
// retrieval operations, this struct also deduplicates the requests and caches
// the results to minimize network/database overhead even in complex filtering
// scenarios.
type scheduler struct {
bit uint // Index of the bit in the bloom filter this scheduler is responsible for
responses map[uint64]*response // Currently pending retrieval requests or already cached responses
lock sync.Mutex // Lock protecting the responses from concurrent access
}

执行调度任务

接下来看如何执行调度操作,这需要了解 Golang 的并发。 run 函数开始执行流水线式的调度任务,并且会返回结果。

  • sections chan uint64 表示调度任务属于哪一段。
  • dist chan *request 表示调度的输入,输入可以是本地的检索请求,可以是来自网络的检索请求。
  • done chan []byte 表示输出的结果,用字节数组表示。
  • quit chan struct{} 是空结构体,通过阻塞控制,表示调度任务是否完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// run creates a retrieval pipeline, receiving section indexes from sections and
// returning the results in the same order through the done channel. Concurrent
// runs of the same scheduler are allowed, leading to retrieval task deduplication.
func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
//请求和回应之间的缓冲通道,用于阻塞和控制。

// Create a forwarder channel between requests and responses of the same size as
// the distribution channel (since that will block the pipeline anyway).
pend := make(chan uint64, cap(dist))

// Start the pipeline schedulers to forward between user -> distributor -> user
wg.Add(2)
go s.scheduleRequests(sections, dist, pend, quit, wg)
go s.scheduleDeliveries(pend, done, quit, wg)
}

首先 pend 变量是用作阻塞控制的中间变量,它可以控制 requestresponse 的协调调度。这里再次强调,调度器可以接受外部的请求,会并发地同时处理网络地检索请求和用户自身的检索请求。

scheduleRequests 方法主要是将调度器检索的段 reqs chan uint64 ,封装到 dist chan *request ,然后初始化 responsepend 将会接收 reqs chan uint64 的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// scheduleRequests reads section retrieval requests from the input channel,
// deduplicates the stream and pushes unique retrieval tasks into the distribution
// channel for a database or network layer to honour.
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(pend)

// Keep reading and scheduling section requests
//一直将 section 封装到 requests,直到收到 quit 信号
for {
select {
case <-quit: //收到退出信号就返回
return
case section, ok := <-reqs:
//如果没有收到退出信号,继续初始化 responses,
//将封装了段高度的请求传入 dist,再把 段高度传入 pend

// New section retrieval requested
if !ok {
return
}
// Deduplicate retrieval requests
unique := false //因为并发执行时可能一已经进入了协程,这里用于去重

//阻塞其他协程,直到这一部分完成,Unlock
s.lock.Lock()
//如果请求为空,那么设置为已完成,避免重复执行
if s.responses[section] == nil {
s.responses[section] = &response{
done: make(chan struct{}),
}
unique = true
}
s.lock.Unlock()

// Schedule the section for retrieval and notify the deliverer to expect this section
if unique {
//如果对应的请求为空,但是还没有在其他协程结束,那么把结果分发出去
select {
case <-quit:
return
case dist <- &request{bit: s.bit, section: section}:
}
}
//如果还没有结束,那么给 pend 赋值,pend 进入阻塞状态。
select {
case <-quit:
return
case pend <- section:
}
}
}
}

接着,scheduleDeliveries 方法接收表示段高度的 pend,然后被 response[section].done 阻塞,直到外部调用 deliver 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// scheduleDeliveries reads section acceptance notifications and waits for them
// to be delivered, pushing them into the output data buffer.
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(done)

// Keep reading notifications and scheduling deliveries
for {
select {
case <-quit:
return
//结束 pend 的阻塞
case idx, ok := <-pend:
//如果没有收到退出信号,那么将每段的缓存写入 res,并且标注这一段的检索已经完成,
//接着将缓存传递给 done,done 会被阻塞,直到后面的 deliver 执行。

// New section retrieval pending
if !ok {
return
}
// Wait until the request is honoured
s.lock.Lock()
res := s.responses[idx] //写入 response,非常关键的一步。
s.lock.Unlock()

select {
case <-quit:
return
case <-res.done: //这个为了阻塞,指代外部传入值才能解除阻塞
}
// Deliver the result
select {
case <-quit:
return
case done <- res.cached: //如果没有结束,将缓存的写入 done
}
}
}
}

核心逻辑

最后的 deliver 函数的参数 data 可以看作是 [sections][]byte,用二维数组表示这次调度的每一段对应的结果。当外部的匹配器完成了工作,就会传入 data,给每一段的 response 的结果赋值,也就是给 cached 赋值。这样解除了 scheduleDeliveries 的阻塞,将结果顺利的赋值给 done,执行调度任务的 run 方法也可以顺利的将结果从 done chan []byte 传出

1
2
3
4
5
6
7
8
9
10
11
12
// deliver is called by the request distributor when a reply to a request arrives.
func (s *scheduler) deliver(sections []uint64, data [][]byte) {
s.lock.Lock()
defer s.lock.Unlock()

for i, section := range sections {
if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries
res.cached = data[i]
close(res.done)
}
}
}

流程如下:

1
2
3
4
5
6
7
8
9
10
                +-----------+         +-----------+
| scheduler | pend | scheduler |
reqs chan --> | routine1 | ------> | routine2 | --> done chan
+-----------+ +-----------+
| ^
| dist chan | response
| |
+--v-------------------+
| outside handler |
+----------------------+

core/bloombits/matcher.go

布隆过滤器的匹配器用于完成实际上的匹配工作。

获取搜索的位置

因为每添加一个元素,都会通过哈希函数得到三个位置,将它们的值置为 1。因此,匹配前首先需要找到这三个位置。至于如何确定位置,可以先了解布隆过滤器如何生成的,参考 core/types/bloom9.go 里面的 bloomValues 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// bloomIndexes represents the bit indexes inside the bloom filter that belong
// to some key.
type bloomIndexes [3]uint

// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
func calcBloomIndexes(b []byte) bloomIndexes {
b = crypto.Keccak256(b)

var idxs bloomIndexes
for i := 0; i < len(idxs); i++ {
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
}
return idxs
}

中间过程的数据结构

partialMatches 表示部分匹配的结果。因为一次过滤可能不止一个条件,我们假设有三个条件 A, B, C,对单个条件的匹配叫做子匹配或者部分匹配。其中,bitset 表示这对单个条件的匹配结果的向量,它会在后面通过和其他条件的 bitset 取与,达到同时满足多个条件的效果

1
2
3
4
5
6
7
8
// partialMatches with a non-nil vector represents a section in which some sub-
// matchers have already found potential matches. Subsequent sub-matchers will
// binary AND their matches with this vector. If vector is nil, it represents a
// section to be processed by the first sub-matcher.
type partialMatches struct {
section uint64
bitset []byte
}

Retrieval 表示一次检索任务 Bit 表示检索第几位,Bitsets 表示 Sections 中每个检索结果向量
构成的矩阵。Retrieval 在后面还会被当作传递向 eth 协议中传递数据的中间数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The contest and error fields are used by the light client to terminate matching
// early if an error is encountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte

//用于 eth 协议终止过滤的匹配操作
Context context.Context
Error error
}

Matcher 是核心数据结构,它流水线作业,调度实际的检索任务。和 scheduler 的区别在于,scheduler 是任务-请求-结果的宏观的调度器,而 Matcher 是接收已经分配好的请求任务,然后计算检索位置,检索后返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Matcher is a pipelined system of schedulers and logic matchers which perform
// binary AND/OR operations on the bit-streams, creating a stream of potential
// blocks to inspect for data content.
type Matcher struct {
//段的大小,默认 4096 个区块
sectionSize uint64 // Size of the data batches to filter on

filters [][]bloomIndexes // Filter the system is matching for
//一次匹配工作包括多个调度器,因为调度器是按照一个位检索的,一次匹配至少检索 3 个位
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits

//当需要检索的位置分配好了后,传递检索任务
retrievers chan chan uint // Retriever processes waiting for bit allocations
//当一次检索任务完成时,传递当前完成的任务数量
counters chan chan uint // Retriever processes waiting for task count reports
//当检索任务分配好后,传递检索任务
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
//当检索完成后,传递任务的结果 response
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries

running uint32 // Atomic flag whether a session is live or not
}

参考