转自公众号:创宇区块链安全实验室
http://mp.weixin.qq.com/s?__biz=Mzg3NjU5NjgwMw==&mid=2247484987&idx=1&sn=d0377a113eebc7e860ca17c15751968f
前言
基本概念
交易流程示意图大致如下所示:
交易池的数据来源: 本地提交,第三方应用通过调用本地以太坊节点的 RPC 服务提交交易; 远程同步,通过广播同步的形式,将其他以太坊节点的交易数据同步至本地节点。 交易池的数据去向:由 miner(矿工) 获取并验证,用于挖矿,挖矿成功后写进区块被广播,交易被写入规范链后会从交易池中进行删除,如果交易被写进分叉则交易池中的交易不会减少,之后等待重新打包。
数据结构
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_pool.go L139
// TxPoolConfig are the configuration parameters of the transaction pool.
type TxPoolConfig struct {
Locals []common.Address //本地账户地址存放
NoLocals bool // 是否开启本地交易机制
Journal string // 本地交易存放路径
Rejournal time.Duration // 持久化本地交易的间隔
PriceLimit uint64 // 价格超出比例,若想覆盖一笔交易的时候,若价格上涨比例达不到要求,那么不能覆盖
PriceBump uint64 // 替换现有交易的最低价格涨幅百分比(一次)
AccountSlots uint64 // 每个账户的可执行交易限制
GlobalSlots uint64 // 全部账户最大可执行交易
AccountQueue uint64 // 单个账户不可执行的交易限制
GlobalQueue uint64 // 全部账户最大非执行交易限制
Lifetime time.Duration // 一个账户在queue中的交易可以存活的时间
}
默认配置如下:
// DefaultTxPoolConfig contains the default configurations for the transaction
// pool.
var DefaultTxPoolConfig = TxPoolConfig{
Journal: \\\"transactions.rlp\\\",
Rejournal: time.Hour,
PriceLimit: 1,
PriceBump: 10,
AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,
Lifetime: 3 * time.Hour,
}
TxPool 数据结构如下所示:
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig // 交易池配置
chainconfig *params.ChainConfig // 区块链配置
chain blockChain // blockchain接口
gasPrice *big.Int
txFeed event.Feed // 时间流
scope event.SubscriptionScope // 订阅范围
signer types.Signer // 签名
mu sync.RWMutex
istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
currentState *state.StateDB // 当前区块头对应的状态
pendingNonces *txNoncer // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
pending map[common.Address]*txList //所有当前可处理的transactions
queue map[common.Address]*txList //虽然位于队列中但是不可处理的transaction
beats map[common.Address]time.Time // 每个已知帐户的最后心跳
all *txLookup // 允许查找的所有transactions
priced *txPricedList // 根据price排序transactions
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
}
基础配置
// filedir:go-ethereum-1.10.2\\\\eth\\\\ethconfig\\\\config.go L42
// FullNodeGPO contains default gasprice oracle settings for full node.
var FullNodeGPO = gasprice.Config{
Blocks: 20,
Percentile: 60,
MaxPrice: gasprice.DefaultMaxPrice,
}
// LightClientGPO contains default gasprice oracle settings for light client.
var LightClientGPO = gasprice.Config{
Blocks: 2,
Percentile: 60,
MaxPrice: gasprice.DefaultMaxPrice,
}
// filedir:go-ethereum-1.10.2\\\\eth\\\\gasprice\\\\gasprice.go L34
var DefaultMaxPrice = big.NewInt(500 * params.GWei)
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_pool.go L160
// DefaultTxPoolConfig contains the default configurations for the transaction
// pool.
var DefaultTxPoolConfig = TxPoolConfig{
Journal: \\\"transactions.rlp\\\",
Rejournal: time.Hour,
PriceLimit: 1,
PriceBump: 10,
AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,
Lifetime: 3 * time.Hour,
}
TxLookupLimit: 2350000,
初始化池
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_pool.go L262
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info(\\\"Setting new local account\\\", \\\"address\\\", addr)
pool.locals.add(addr)
}
pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
// Start the reorg loop early so it can handle requests generated during journal loading.
pool.wg.Add(1)
go pool.scheduleReorgLoop()
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != \\\"\\\" {
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn(\\\"Failed to load transaction journal\\\", \\\"err\\\", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn(\\\"Failed to rotate transaction journal\\\", \\\"err\\\", err)
}
}
// Subscribe events from blockchain and start the main event loop.
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
go pool.loop()
return pool
}
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_pool.go L177
// sanitize checks the provided user configurations and changes anything that\\\'s
// unreasonable or unworkable.
func (config *TxPoolConfig) sanitize() TxPoolConfig {
conf := *config
if conf.Rejournal < time.Second {
log.Warn(\\\"Sanitizing invalid txpool journal time\\\", \\\"provided\\\", conf.Rejournal, \\\"updated\\\", time.Second)
conf.Rejournal = time.Second
}
if conf.PriceLimit < 1 {
log.Warn(\\\"Sanitizing invalid txpool price limit\\\", \\\"provided\\\", conf.PriceLimit, \\\"updated\\\", DefaultTxPoolConfig.PriceLimit)
conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
}
if conf.PriceBump < 1 {
log.Warn(\\\"Sanitizing invalid txpool price bump\\\", \\\"provided\\\", conf.PriceBump, \\\"updated\\\", DefaultTxPoolConfig.PriceBump)
conf.PriceBump = DefaultTxPoolConfig.PriceBump
}
if conf.AccountSlots < 1 {
log.Warn(\\\"Sanitizing invalid txpool account slots\\\", \\\"provided\\\", conf.AccountSlots, \\\"updated\\\", DefaultTxPoolConfig.AccountSlots)
conf.AccountSlots = DefaultTxPoolConfig.AccountSlots
}
if conf.GlobalSlots < 1 {
log.Warn(\\\"Sanitizing invalid txpool global slots\\\", \\\"provided\\\", conf.GlobalSlots, \\\"updated\\\", DefaultTxPoolConfig.GlobalSlots)
conf.GlobalSlots = DefaultTxPoolConfig.GlobalSlots
}
if conf.AccountQueue < 1 {
log.Warn(\\\"Sanitizing invalid txpool account queue\\\", \\\"provided\\\", conf.AccountQueue, \\\"updated\\\", DefaultTxPoolConfig.AccountQueue)
conf.AccountQueue = DefaultTxPoolConfig.AccountQueue
}
if conf.GlobalQueue < 1 {
log.Warn(\\\"Sanitizing invalid txpool global queue\\\", \\\"provided\\\", conf.GlobalQueue, \\\"updated\\\", DefaultTxPoolConfig.GlobalQueue)
conf.GlobalQueue = DefaultTxPoolConfig.GlobalQueue
}
if conf.Lifetime < 1 {
log.Warn(\\\"Sanitizing invalid txpool lifetime\\\", \\\"provided\\\", conf.Lifetime, \\\"updated\\\", DefaultTxPoolConfig.Lifetime)
conf.Lifetime = DefaultTxPoolConfig.Lifetime
}
return conf
}
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info(\\\"Setting new local account\\\", \\\"address\\\", addr)
pool.locals.add(addr)
}
pool.priced = newTxPricedList(pool.all)
具体实现代码如下所示:
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_list.go L450
// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{
all: all,
remotes: new(priceHeap),
}
}
之后调用 reset 更新交易池:
pool.reset(nil, chain.CurrentBlock().Header())
// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// If we\\\'re reorging an old state, reinject all dropped transactions
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug(\\\"Skipping deep transaction reorg\\\", \\\"depth\\\", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
if rem == nil {
// This can happen if a setHead is performed, where we simply discard the old
// head from the chain.
// If that is the case, we don\\\'t have the lost transactions any more, and
// there\\\'s nothing to add
if newNum >= oldNum {
// If we reorged to a same or higher number, then it\\\'s not a case of setHead
log.Warn(\\\"Transaction pool reset with missing oldhead\\\",
\\\"old\\\", oldHead.Hash(), \\\"oldnum\\\", oldNum, \\\"new\\\", newHead.Hash(), \\\"newnum\\\", newNum)
return
}
// If the reorg ended up on a lower number, it\\\'s indicative of setHead being the cause
log.Debug(\\\"Skipping transaction reset caused by setHead\\\",
\\\"old\\\", oldHead.Hash(), \\\"oldnum\\\", oldNum, \\\"new\\\", newHead.Hash(), \\\"newnum\\\", newNum)
// We still need to update the current state s.th. the lost transactions can be readded by the user
} else {
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error(\\\"Unrooted old chain seen by tx pool\\\", \\\"block\\\", oldHead.Number, \\\"hash\\\", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error(\\\"Unrooted new chain seen by tx pool\\\", \\\"block\\\", newHead.Number, \\\"hash\\\", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error(\\\"Unrooted old chain seen by tx pool\\\", \\\"block\\\", oldHead.Number, \\\"hash\\\", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error(\\\"Unrooted new chain seen by tx pool\\\", \\\"block\\\", newHead.Number, \\\"hash\\\", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
}
}
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error(\\\"Failed to reset txpool state\\\", \\\"err\\\", err)
return
}
pool.currentState = statedb
pool.pendingNonces = newTxNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit
// Inject any transactions discarded due to reorgs
log.Debug(\\\"Reinjecting stale transactions\\\", \\\"count\\\", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul = pool.chainconfig.IsIstanbul(next)
pool.eip2718 = pool.chainconfig.IsBerlin(next)
}
之后启动 reorg 循环,使其能够处理日志加载期间生成的请求:
pool.wg.Add(1)
go pool.scheduleReorgLoop()
scheduleReorgLoop 具体实现代码如下所示,该函数主要用于 reset 和 promoteExecutable 的执行计划。
// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
// call those methods directly, but request them being run using requestReset and
// requestPromoteExecutables instead.
func (pool *TxPool) scheduleReorgLoop() {
defer pool.wg.Done()
var (
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})
launchNextRun bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*txSortedMap)
)
for {
// Launch next background reorg if needed
if curDone == nil && launchNextRun {
// Run the background reorg and announcements
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)
// Prepare everything for the next round of reorg
curDone, nextDone = nextDone, make(chan struct{})
launchNextRun = false
reset, dirtyAccounts = nil, nil
queuedEvents = make(map[common.Address]*txSortedMap)
}
select {
case req := <-pool.reqResetCh:
// Reset request: update head if request is already pending.
if reset == nil {
reset = req
} else {
reset.newHead = req.newHead
}
launchNextRun = true
pool.reorgDoneCh <- nextDone
case req := <-pool.reqPromoteCh:
// Promote request: update address set if request is already pending.
if dirtyAccounts == nil {
dirtyAccounts = req
} else {
dirtyAccounts.merge(req)
}
launchNextRun = true
pool.reorgDoneCh <- nextDone
case tx := <-pool.queueTxEventCh:
// Queue up the event, but don\\\'t schedule a reorg. It\\\'s up to the caller to
// request one later if they want the events sent.
addr, _ := types.Sender(pool.signer, tx)
if _, ok := queuedEvents[addr]; !ok {
queuedEvents[addr] = newTxSortedMap()
}
queuedEvents[addr].Put(tx)
case <-curDone:
curDone = nil
case <-pool.reorgShutdownCh:
// Wait for current run to finish.
if curDone != nil {
<-curDone
}
close(nextDone)
return
}
}
}
此时如果本地交易开启那么从本地磁盘加载本地交易。
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != \\\"\\\" {
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn(\\\"Failed to load transaction journal\\\", \\\"err\\\", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn(\\\"Failed to rotate transaction journal\\\", \\\"err\\\", err)
}
}
之后订阅相关交易事件并开启主循环:
// Subscribe events from blockchain and start the main event loop.
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
go pool.loop()
return pool
主循环 loop 具体实现代码如下,它是 txPool 的一个 goroutine,也是主要的事件循环,它主要用于等待和响应外部区块链事件以及各种报告和交易驱逐事件:
// loop is the transaction pool\\\'s main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (pool *TxPool) loop() {
defer pool.wg.Done()
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
defer report.Stop()
defer evict.Stop()
defer journal.Stop()
for {
select {
// Handle ChainHeadEvent
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
pool.requestReset(head.Header(), ev.Block.Header())
head = ev.Block
}
// System shutdown.
case <-pool.chainHeadSub.Err():
close(pool.reorgShutdownCh)
return
// Handle stats reporting ticks
case <-report.C:
pool.mu.RLock()
pending, queued := pool.stats()
stales := pool.priced.stales
pool.mu.RUnlock()
if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug(\\\"Transaction pool status report\\\", \\\"executable\\\", pending, \\\"queued\\\", queued, \\\"stales\\\", stales)
prevPending, prevQueued, prevStales = pending, queued, stales
}
// Handle inactive account transaction eviction
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true)
}
queuedEvictionMeter.Mark(int64(len(list)))
}
}
pool.mu.Unlock()
// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn(\\\"Failed to rotate local tx journal\\\", \\\"err\\\", err)
}
pool.mu.Unlock()
}
}
}
}
构建交易
curl -X POST --data \\\'{\\\"jsonrpc\\\":\\\"2.0\\\",\\\"method\\\":\\\"eth_sendTransaction\\\",\\\"params\\\":[{see below}],\\\"id\\\":1}\\\'
from: DATA,20字节 – 发送交易的源地址 to: DATA,20字节 – 交易的目标地址,当创建新合约时可选
-
gas: QUANTITY – 交易执行可用gas量,可选整数,默认值90000,未用gas将返还
-
gasPrice: QUANTITY – gas价格,可选,默认值:待定(To-Be-Determined)
-
value: QUANTITY – 交易发送的金额,可选整数
-
data: DATA – 合约的编译带啊或被调用方法的签名及编码参数
-
nonce: QUANTITY – nonce,可选,可以使用同一个nonce来实现挂起的交易的重写
params: [{
\\\"from\\\": \\\"0xb60e8dd61c5d32be8058bb8eb970870f07233155\\\",
\\\"to\\\": \\\"0xd46e8dd67c5d32be8058bb8eb970870f07244567\\\",
\\\"gas\\\": \\\"0x76c0\\\", // 30400
\\\"gasPrice\\\": \\\"0x9184e72a000\\\", // 10000000000000
\\\"value\\\": \\\"0x9184e72a\\\", // 2441406250
\\\"data\\\": \\\"0xd46e8dd67c5d32be8d46e8dd67c5d32be8058bb8eb970870f072445675058bb8eb970870f072445675\\\"
}]
{
\\\"id\\\":1,
\\\"jsonrpc\\\": \\\"2.0\\\",
\\\"result\\\": \\\"0xe670ec64341771606e55d6b4ca35a1a6b75ee3d5145a99d05921026d1527331\\\"
}
下面我们来跟踪一下 eth_sendTransaction 这一个 RPC 的执行过程,在这里首先检索账户是否存在,之后检查 Nonce 是否为空,紧接着调用 SingTx 进行签名操作,之后调用 SubmitTransaction 来提交交易:
// filedir:go-ethereum-1.10.2\\\\internal\\\\ethapi\\\\api.go L1736
// SendTransaction creates a transaction for the given argument, sign it and submit it to the
// transaction pool.
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {
// Look up the wallet containing the requested signer
account := accounts.Account{Address: args.From}
wallet, err := s.b.AccountManager().Find(account)
if err != nil {
return common.Hash{}, err
}
if args.Nonce == nil {
// Hold the addresse\\\'s mutex around signing to prevent concurrent assignment of
// the same nonce to multiple accounts.
s.nonceLock.LockAddr(args.From)
defer s.nonceLock.UnlockAddr(args.From)
}
// Set some sanity defaults and terminate on failure
if err := args.setDefaults(ctx, s.b); err != nil {
return common.Hash{}, err
}
// Assemble the transaction and sign with the wallet
tx := args.toTransaction()
signed, err := wallet.SignTx(account, tx, s.b.ChainConfig().ChainID)
if err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, signed)
}
SignTx 实现代码如下所示,在这里会继续调用 SignTx 进行签名操作,这里不再深入,后续的\\”交易签名\\”会进行纤细分析:
// filedir:go-ethereum-1.10.2\\\\accounts\\\\usbwallet\\\\wallet.go L581
// SignTx implements accounts.Wallet. It sends the transaction over to the Ledger
// wallet to request a confirmation from the user. It returns either the signed
// transaction or a failure if the user denied the transaction.
//
// Note, if the version of the Ethereum application running on the Ledger wallet is
// too old to sign EIP-155 transactions, but such is requested nonetheless, an error
// will be returned opposed to silently signing in Homestead mode.
func (w *wallet) SignTx(account accounts.Account, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) {
w.stateLock.RLock() // Comms have own mutex, this is for the state fields
defer w.stateLock.RUnlock()
// If the wallet is closed, abort
if w.device == nil {
return nil, accounts.ErrWalletClosed
}
// Make sure the requested account is contained within
path, ok := w.paths[account.Address]
if !ok {
return nil, accounts.ErrUnknownAccount
}
// All infos gathered and metadata checks out, request signing
<-w.commsLock
defer func() { w.commsLock <- struct{}{} }()
// Ensure the device isn\\\'t screwed with while user confirmation is pending
// TODO(karalabe): remove if hotplug lands on Windows
w.hub.commsLock.Lock()
w.hub.commsPend++
w.hub.commsLock.Unlock()
defer func() {
w.hub.commsLock.Lock()
w.hub.commsPend--
w.hub.commsLock.Unlock()
}()
// Sign the transaction and verify the sender to avoid hardware fault surprises
sender, signed, err := w.driver.SignTx(path, tx, chainID)
if err != nil {
return nil, err
}
if sender != account.Address {
return nil, fmt.Errorf(\\\"signer mismatch: expected %s, got %s\\\", account.Address.Hex(), sender.Hex())
}
return signed, nil
}
签名之后返回 SendTransaction 中去调用 SubmitTransaction 来提交签名,在这里会首先检查交易费用是否足够,之后调用 SendTx 来发送交易:
// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
// If the transaction fee cap is already specified, ensure the
// fee of the given transaction is _reasonable_.
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
return common.Hash{}, err
}
if !b.UnprotectedAllowed() && !tx.Protected() {
// Ensure only eip155 signed transactions are submitted if EIP155Required is set.
return common.Hash{}, errors.New(\\\"only replay-protected (EIP-155) transactions allowed over RPC\\\")
}
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number())
from, err := types.Sender(signer, tx)
if err != nil {
return common.Hash{}, err
}
if tx.To() == nil {
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info(\\\"Submitted contract creation\\\", \\\"hash\\\", tx.Hash().Hex(), \\\"from\\\", from, \\\"nonce\\\", tx.Nonce(), \\\"contract\\\", addr.Hex(), \\\"value\\\", tx.Value())
} else {
log.Info(\\\"Submitted transaction\\\", \\\"hash\\\", tx.Hash().Hex(), \\\"from\\\", from, \\\"nonce\\\", tx.Nonce(), \\\"recipient\\\", tx.To(), \\\"value\\\", tx.Value())
}
return tx.Hash(), nil
}
SendTx 的具体实现如下,在这里会调用 AddLocal 来添加交易到交易池中去,这里不再深入后续会有\\”添加交易\\”这一个分析单元模块:
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}
之后检查接受地址是否为空,如果为空则创建一个地址(一般在合约创建时出现),之后打印一份完整的TX详细信息的日志便于后续手动调查分析,之后返回交易的 hash 值:
if tx.To() == nil {
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info(\\\"Submitted contract creation\\\", \\\"hash\\\", tx.Hash().Hex(), \\\"from\\\", from, \\\"nonce\\\", tx.Nonce(), \\\"contract\\\", addr.Hex(), \\\"value\\\", tx.Value())
} else {
log.Info(\\\"Submitted transaction\\\", \\\"hash\\\", tx.Hash().Hex(), \\\"from\\\", from, \\\"nonce\\\", tx.Nonce(), \\\"recipient\\\", tx.To(), \\\"value\\\", tx.Value())
}
return tx.Hash(), nil
交易入池
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_pool.go L755
// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
// senders as a local ones, ensuring they go around the local pricing constraints.
//
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
return pool.addTxs(txs, !pool.config.NoLocals, true)
}
// AddLocal enqueues a single local transaction into the pool if it is valid. This is
// a convenience wrapper aroundd AddLocals.
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
errs := pool.AddLocals([]*types.Transaction{tx})
return errs[0]
}
// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
// senders are not among the locally tracked ones, full pricing constraints will apply.
//
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, false)
}
// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, true)
}
addTxs 代码如下所示:
// addTxs attempts to queue a batch of transactions if they are valid.
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
news = make([]*types.Transaction, 0, len(txs))
)
for i, tx := range txs {
// If the transaction is known, pre-set the error slot
if pool.all.Get(tx.Hash()) != nil {
errs[i] = ErrAlreadyKnown
knownTxMeter.Mark(1)
continue
}
// Exclude transactions with invalid signatures as soon as
// possible and cache senders in transactions before
// obtaining lock
_, err := types.Sender(pool.signer, tx)
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
continue
}
// Accumulate all unknown transactions for deeper processing
news = append(news, tx)
}
if len(news) == 0 {
return errs
}
// Process all the new transaction and merge any errors into the original slice
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
pool.mu.Unlock()
var nilSlot = 0
for _, err := range newErrs {
for errs[nilSlot] != nil {
nilSlot++
}
errs[nilSlot] = err
nilSlot++
}
// Reorg the pool internals if needed and return
done := pool.requestPromoteExecutables(dirtyAddrs)
if sync {
<-done
}
return errs
}
首先会对交易进行过滤,检查是否是一个已知的交易(即添加过或广播过的),之后调用 send 函数校验通过 secp256k1 椭圆曲线从签名(v,r,s)派生的地址,如果派生失败或签名不正确,则返回错误:
var (
errs = make([]error, len(txs))
news = make([]*types.Transaction, 0, len(txs))
)
for i, tx := range txs {
// If the transaction is known, pre-set the error slot
if pool.all.Get(tx.Hash()) != nil {
errs[i] = ErrAlreadyKnown
knownTxMeter.Mark(1)
continue
}
// Exclude transactions with invalid signatures as soon as
// possible and cache senders in transactions before
// obtaining lock
_, err := types.Sender(pool.signer, tx)
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
continue
}
// Accumulate all unknown transactions for deeper processing
news = append(news, tx)
}
if len(news) == 0 {
return errs
}
// Process all the new transaction and merge any errors into the original slice
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
pool.mu.Unlock()
addTxsLocked 的具体实现如下所示,它会将有效的交易进行排队处理,同时调用 pool.add 函数将交易添加到交易队列中去:
// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs))
for i, tx := range txs {
replaced, err := pool.add(tx, local)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx)
}
}
validTxMeter.Mark(int64(len(dirty.accounts)))
return errs, dirty
}
add 函数的具体实现如下所示:
// add validates a transaction and inserts it into the non-executable queue for later
// pending promotion and execution. If the transaction is a replacement for an already
// pending or queued one, it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will be
// whitelisted, preventing any associated transaction from being dropped out of the pool
// due to pricing constraints.
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace(\\\"Discarding already known transaction\\\", \\\"hash\\\", hash)
knownTxMeter.Mark(1)
return false, ErrAlreadyKnown
}
// Make the local flag. If it\\\'s from local source or it\\\'s from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, isLocal); err != nil {
log.Trace(\\\"Discarding invalid transaction\\\", \\\"hash\\\", hash, \\\"err\\\", err)
invalidTxMeter.Mark(1)
return false, err
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Count()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don\\\'t accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace(\\\"Discarding underpriced transaction\\\", \\\"hash\\\", hash, \\\"price\\\", tx.GasPrice())
underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it.
// If it\\\'s a local transaction, forcibly discard all available transactions.
// Otherwise if we can\\\'t make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
// Special case, we still can\\\'t make the room for the new remote one.
if !isLocal && !success {
log.Trace(\\\"Discarding overflown transaction\\\", \\\"hash\\\", hash)
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// Kick out the underpriced remote transactions.
for _, tx := range drop {
log.Trace(\\\"Discarding freshly underpriced transaction\\\", \\\"hash\\\", tx.Hash(), \\\"price\\\", tx.GasPrice())
underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)
}
}
// Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
log.Trace(\\\"Pooled new executable transaction\\\", \\\"hash\\\", hash, \\\"from\\\", from, \\\"to\\\", tx.To())
// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
return old != nil, nil
}
// New transaction isn\\\'t replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
if err != nil {
return false, err
}
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
log.Info(\\\"Setting new local account\\\", \\\"address\\\", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it\\\'s marked as local first time.
}
if isLocal {
localGauge.Inc(1)
}
pool.journalTx(from, tx)
log.Trace(\\\"Pooled new future transaction\\\", \\\"hash\\\", hash, \\\"from\\\", from, \\\"to\\\", tx.To())
return replaced, nil
}
在这里会首先检查当前的交易是否已经知晓(即被广播过或者添加到池子里过),如果已知晓则直接丢弃:
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace(\\\"Discarding already known transaction\\\", \\\"hash\\\", hash)
knownTxMeter.Mark(1)
return false, ErrAlreadyKnown
}
之后鉴别交易是本地提交还是远程提交,并调用函数 validateTx 来验证交易,如果验证不通过则直接丢弃:
// Make the local flag. If it\\\'s from local source or it\\\'s from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, isLocal); err != nil {
log.Trace(\\\"Discarding invalid transaction\\\", \\\"hash\\\", hash, \\\"err\\\", err)
invalidTxMeter.Mark(1)
return false, err
}
之后检查交易池是否满了,如果满了则放弃交易队列中定价过低的交易,GlobalSlots 和 GlobalQueue 为 pending 和 queue 的最大容量:
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Count()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don\\\'t accept it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace(\\\"Discarding underpriced transaction\\\", \\\"hash\\\", hash, \\\"price\\\", tx.GasPrice())
underpricedTxMeter.Mark(1)
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it.
// If it\\\'s a local transaction, forcibly discard all available transactions.
// Otherwise if we can\\\'t make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
// Special case, we still can\\\'t make the room for the new remote one.
if !isLocal && !success {
log.Trace(\\\"Discarding overflown transaction\\\", \\\"hash\\\", hash)
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// Kick out the underpriced remote transactions.
for _, tx := range drop {
log.Trace(\\\"Discarding freshly underpriced transaction\\\", \\\"hash\\\", tx.Hash(), \\\"price\\\", tx.GasPrice())
underpricedTxMeter.Mark(1)
pool.removeTx(tx.Hash(), false)
}
}
之后判断当前交易在 pending 队列中是否存在 nonce 值相同的交易,如果存在则判断当前交易所设置的 gasprice 是否超过设置的 PriceBump 百分比,超过则替换覆盖已存在的交易,否则报错返回替换交易 Gasprice 过低,并且把它扔到 queue 队列中( enqueueTx ):
// Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
log.Trace(\\\"Pooled new executable transaction\\\", \\\"hash\\\", hash, \\\"from\\\", from, \\\"to\\\", tx.To())
// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
return old != nil, nil
}
之后调用 enqueueTx 将添加到交易队列中去,同时检查 from 账户是否为本地地址,如果是则添加到交易池本地地址中去:
// New transaction isn\\\'t replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
if err != nil {
return false, err
}
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
log.Info(\\\"Setting new local account\\\", \\\"address\\\", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it\\\'s marked as local first time.
}
if isLocal {
localGauge.Inc(1)
}
pool.journalTx(from, tx)
log.Trace(\\\"Pooled new future transaction\\\", \\\"hash\\\", hash, \\\"from\\\", from, \\\"to\\\", tx.To())
return replaced, nil
enqueueTx 代码如下所示,该函数主要将新的交易插入到交易队列中去:
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
}
// If the transaction isn\\\'t in lookup set but it\\\'s expected to be there,
// show the error log.
if pool.all.Get(hash) == nil && !addAll {
log.Error(\\\"Missing transaction in lookup set, please report the issue\\\", \\\"hash\\\", hash)
}
if addAll {
pool.all.Add(tx, local)
pool.priced.Put(tx, local)
}
// If we never record the heartbeat, do it right now.
if _, exist := pool.beats[from]; !exist {
pool.beats[from] = time.Now()
}
return old != nil, nil
}
最后会到 addTx 函数中在这里会调用 requestPromoteExecutables 函数进行一次交易提升请求操作,它主要将交易从 queue 投放到 pending 中去:
// Reorg the pool internals if needed and return
done := pool.requestPromoteExecutables(dirtyAddrs)
if sync {
<-done
}
交易签名
// filedir:go-ethereum-1.10.2\\\\accounts\\\\usbwallet\\\\wallet.go L582
// SignTx implements accounts.Wallet. It sends the transaction over to the Ledger
// wallet to request a confirmation from the user. It returns either the signed
// transaction or a failure if the user denied the transaction.
//
// Note, if the version of the Ethereum application running on the Ledger wallet is
// too old to sign EIP-155 transactions, but such is requested nonetheless, an error
// will be returned opposed to silently signing in Homestead mode.
func (w *wallet) SignTx(account accounts.Account, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) {
w.stateLock.RLock() // Comms have own mutex, this is for the state fields
defer w.stateLock.RUnlock()
// If the wallet is closed, abort
if w.device == nil {
return nil, accounts.ErrWalletClosed
}
// Make sure the requested account is contained within
path, ok := w.paths[account.Address]
if !ok {
return nil, accounts.ErrUnknownAccount
}
// All infos gathered and metadata checks out, request signing
<-w.commsLock
defer func() { w.commsLock <- struct{}{} }()
// Ensure the device isn\\\'t screwed with while user confirmation is pending
// TODO(karalabe): remove if hotplug lands on Windows
w.hub.commsLock.Lock()
w.hub.commsPend++
w.hub.commsLock.Unlock()
defer func() {
w.hub.commsLock.Lock()
w.hub.commsPend--
w.hub.commsLock.Unlock()
}()
// Sign the transaction and verify the sender to avoid hardware fault surprises
sender, signed, err := w.driver.SignTx(path, tx, chainID)
if err != nil {
return nil, err
}
if sender != account.Address {
return nil, fmt.Errorf(\\\"signer mismatch: expected %s, got %s\\\", account.Address.Hex(), sender.Hex())
}
return signed, nil
}
// SignTx implements accounts.Wallet, attempting to sign the given transaction
// with the given account. If the wallet does not wrap this particular account,
// an error is returned to avoid account leakage (even though in theory we may
// be able to sign via our shared keystore backend).
func (w *keystoreWallet) SignTx(account accounts.Account, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) {
// Make sure the requested account is contained within
if !w.Contains(account) {
return nil, accounts.ErrUnknownAccount
}
// Account seems valid, request the keystore to sign
return w.keystore.SignTx(account, tx, chainID)
}
校验过账户的有效性后我们可以通过 SignTx 来使用 keystore 进行签名处理,在这里紧接着调用 LatestSignerForChainID 进行签名:
// SignTx signs the given transaction with the requested account.
func (ks *KeyStore) SignTx(a accounts.Account, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error) {
// Look up the key to sign with and abort if it cannot be found
ks.mu.RLock()
defer ks.mu.RUnlock()
unlockedKey, found := ks.unlocked[a.Address]
if !found {
return nil, ErrLocked
}
// Depending on the presence of the chain ID, sign with 2718 or homestead
signer := types.LatestSignerForChainID(chainID)
return types.SignTx(tx, signer, unlockedKey.PrivateKey)
}
之后再 SignTx函数 中使用私钥进行签名:
// SignTx signs the transaction using the given signer and private key.
func SignTx(tx *Transaction, s Signer, prv *ecdsa.PrivateKey) (*Transaction, error) {
h := s.Hash(tx)
sig, err := crypto.Sign(h[:], prv)
if err != nil {
return nil, err
}
return tx.WithSignature(s, sig)
}
在 sign 中使用 ECDSA (椭圆曲线加密算法)进行签名,之后返回签名的结果:
// Sign calculates an ECDSA signature.
//
// This function is susceptible to chosen plaintext attacks that can leak
// information about the private key that is used for signing. Callers must
// be aware that the given digest cannot be chosen by an adversery. Common
// solution is to hash any input before calculating the signature.
//
// The produced signature is in the [R || S || V] format where V is 0 or 1.
func Sign(digestHash []byte, prv *ecdsa.PrivateKey) (sig []byte, err error) {
if len(digestHash) != DigestLength {
return nil, fmt.Errorf(\\\"hash is required to be exactly %d bytes (%d)\\\", DigestLength, len(digestHash))
}
seckey := math.PaddedBigBytes(prv.D, prv.Params().BitSize/8)
defer zeroBytes(seckey)
return secp256k1.Sign(digestHash, seckey)
}
交易验证
用户完成一笔交易的签名时,需要将交易提交到区块链网络中,是交易能够尽快确认,节点在提交交易之前需要先验证交易,确认交易的合法性; 节点收到其他节点广播的交易时,节点需要先验证交易是否合法,合法的交易才会加入节点的交易池; 当一个挖矿节点成功计算出符合要求的哈希值后,节点会将交易池中的交易打包到区块中,接地那在打包交易的时候需要验证交易的合法性; 节点收到其他节点同步到的区块是,也需要验证区块中包含的交易。
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718 && tx.Type() != types.LegacyTxType {
return ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if uint64(tx.Size()) > txMaxSize {
return ErrOversizedData
}
// Transactions can\\\'t be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
// Ensure the transaction doesn\\\'t exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
}
// Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price
if !local && tx.GasPriceIntCmp(pool.gasPrice) < 0 {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
return nil
}
交易升级
交易升级主要是指将交易放入 pending 列表中去,该方法与 add 方法的不同之处在于 add函数 是将获得到的新交易插入 pending,而 PromoteExecutables 是将把给定的账号地址列表中可以执行的交易从 queue 列表中插入 pending 中,并检查失效的交易,然后发送交易池更新事件,其实现代码如下所示:
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_pool.go L1210
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Trace(\\\"Removed old queued transactions\\\", \\\"count\\\", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Trace(\\\"Removed unpayable queued transactions\\\", \\\"count\\\", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
// Gather all executable transactions and promote them
readies := list.Ready(pool.pendingNonces.get(addr))
for _, tx := range readies {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
promoted = append(promoted, tx)
}
}
log.Trace(\\\"Promoted queued transactions\\\", \\\"count\\\", len(promoted))
queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit
var caps types.Transactions
if !pool.locals.contains(addr) {
caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace(\\\"Removed cap-exceeding queued transaction\\\", \\\"hash\\\", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
}
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
}
}
return promoted
}
在这里通过一个 for 循环来迭代所有的账户并升级交易,在这里首先将所有 queue 中 nonce 低于账户当前 nonce 的交易删除:
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Trace(\\\"Removed old queued transactions\\\", \\\"count\\\", len(forwards))
之后将所有 queue 中消费大于账户所持余额或者 gas 大于最大 gas 限制的交易移除:
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
}
log.Trace(\\\"Removed unpayable queued transactions\\\", \\\"count\\\", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
之后将所有可执行的交易从 queue 里面添加到 pending 里面,在这里会调用 promoteTx 方法将队列中的交易( Txs )放入 pending:
// Gather all executable transactions and promote them
readies := list.Ready(pool.pendingNonces.get(addr))
for _, tx := range readies {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
promoted = append(promoted, tx)
}
}
log.Trace(\\\"Promoted queued transactions\\\", \\\"count\\\", len(promoted))
queuedGauge.Dec(int64(len(readies)))
promoteTx 实现代码如下所示,该函数首先将交易插入到 pending 队列中去,如果旧交易更好(交易 Gasprice 大于或等于原交易价值的 110% 为标准,具体跟 pricebump 设定有关系)则删除当前这个交易,如果当前交易相较于旧的交易更好则删除旧的交易,之后更新列表:
// promoteTx adds a transaction to the pending (processable) list of transactions
// and returns whether it was inserted or an older was better.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
pool.priced.Removed(1)
pendingDiscardMeter.Mark(1)
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the pending counter
pendingGauge.Inc(1)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.pendingNonces.set(addr, tx.Nonce()+1)
// Successful promotion, bump the heartbeat
pool.beats[addr] = time.Now()
return true
}
之后回到 promoteExecutables 函数中,如果非本地账户 queue 小于限制( AccountQueue )则进行移除操作:
// Drop all transactions over the allowed limit
var caps types.Transactions
if !pool.locals.contains(addr) {
caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace(\\\"Removed cap-exceeding queued transaction\\\", \\\"hash\\\", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
}
最后记录移除的条目并更新 queuedGauge,如果队列中此账户的交易为空则删除此账户:
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
}
}
return promoted
交易降级
分叉导致 Account 的 Nonce 值降低:假如原规范链A上交易序号m花费了20,且已经上链,而分叉后新规范链上交易序号m未上链,从而导致在规范链上记录的账户的 Nonce 降低,这样交易m就必须要回滚到交易池,放到 queue 中去; 分叉后出现间隙:这种问题出现通常是因为交易余额问题导致的,假如原规范链上交易m花费100,分叉后该账户又发出一个交易m花费200,这就导致该账户余额本来可以支付原来规范链上的某笔交易,但在新的规范链上可能就不够了,这个余额不足的交易如果是m+3,那么在m+2,m+4号交易之间就出现了空隙,这就导致从m+3开始往后所有的交易都要降级; 分叉导致 pending 最前一个交易的 nonce 值与状态的 nonce 值不等。
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)
// Drop all transactions that are deemed too old (low nonce)
olds := list.Forward(nonce)
for _, tx := range olds {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace(\\\"Removed old pending transaction\\\", \\\"hash\\\", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace(\\\"Removed unpayable pending transaction\\\", \\\"hash\\\", hash)
pool.all.Remove(hash)
}
pool.priced.Removed(len(olds) + len(drops))
pendingNofundsMeter.Mark(int64(len(drops)))
for _, tx := range invalids {
hash := tx.Hash()
log.Trace(\\\"Demoting pending transaction\\\", \\\"hash\\\", hash)
// Internal shuffle shouldn\\\'t touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
}
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
// If there\\\'s a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
gapped := list.Cap(0)
for _, tx := range gapped {
hash := tx.Hash()
log.Error(\\\"Demoting invalidated transaction\\\", \\\"hash\\\", hash)
// Internal shuffle shouldn\\\'t touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
}
pendingGauge.Dec(int64(len(gapped)))
// This might happen in a reorg, so log it to the metering
blockReorgInvalidatedTx.Mark(int64(len(gapped)))
}
// Delete the entire pending entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
}
}
}
池子重置
// filedir:go-ethereum-1.10.2\\\\core\\\\tx_pool.go L1120
// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// If we\\\'re reorging an old state, reinject all dropped transactions
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug(\\\"Skipping deep transaction reorg\\\", \\\"depth\\\", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
if rem == nil {
// This can happen if a setHead is performed, where we simply discard the old
// head from the chain.
// If that is the case, we don\\\'t have the lost transactions any more, and
// there\\\'s nothing to add
if newNum >= oldNum {
// If we reorged to a same or higher number, then it\\\'s not a case of setHead
log.Warn(\\\"Transaction pool reset with missing oldhead\\\",
\\\"old\\\", oldHead.Hash(), \\\"oldnum\\\", oldNum, \\\"new\\\", newHead.Hash(), \\\"newnum\\\", newNum)
return
}
// If the reorg ended up on a lower number, it\\\'s indicative of setHead being the cause
log.Debug(\\\"Skipping transaction reset caused by setHead\\\",
\\\"old\\\", oldHead.Hash(), \\\"oldnum\\\", oldNum, \\\"new\\\", newHead.Hash(), \\\"newnum\\\", newNum)
// We still need to update the current state s.th. the lost transactions can be readded by the user
} else {
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error(\\\"Unrooted old chain seen by tx pool\\\", \\\"block\\\", oldHead.Number, \\\"hash\\\", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error(\\\"Unrooted new chain seen by tx pool\\\", \\\"block\\\", newHead.Number, \\\"hash\\\", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error(\\\"Unrooted old chain seen by tx pool\\\", \\\"block\\\", oldHead.Number, \\\"hash\\\", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error(\\\"Unrooted new chain seen by tx pool\\\", \\\"block\\\", newHead.Number, \\\"hash\\\", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
}
}
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error(\\\"Failed to reset txpool state\\\", \\\"err\\\", err)
return
}
pool.currentState = statedb
pool.pendingNonces = newTxNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit
// Inject any transactions discarded due to reorgs
log.Debug(\\\"Reinjecting stale transactions\\\", \\\"count\\\", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul = pool.chainconfig.IsIstanbul(next)
pool.eip2718 = pool.chainconfig.IsBerlin(next)
}
如果老区块不为空且老区块不是新区块的父区块,则检查老区块和新区块之间的差值是否大于64,如果超过64则不进行重组,否则获取旧头和新头的最新区块,如果旧头为 null 则检查新头的高度是否小于旧头的高度,则打印日志并直接return,如果不满足则继续向下执行;
如果旧头不为 null 则开始进行重组,此时如果旧链的头区块大于新链的头区块高度时则旧链先后回退并回收所有回退的交易,如果新链的头区块大于旧链的头区块则新链后退并回收交易,当新链和旧链的到达同一高度时则同时回退直到找到共同的父节点,之后找出所有存储在 discard 里面但是不在 included 里面的值,之后将这些交易重新插入到 pool 里面:
// If we\\\'re reorging an old state, reinject all dropped transactions
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug(\\\"Skipping deep transaction reorg\\\", \\\"depth\\\", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
if rem == nil {
// This can happen if a setHead is performed, where we simply discard the old
// head from the chain.
// If that is the case, we don\\\'t have the lost transactions any more, and
// there\\\'s nothing to add
if newNum >= oldNum {
// If we reorged to a same or higher number, then it\\\'s not a case of setHead
log.Warn(\\\"Transaction pool reset with missing oldhead\\\",
\\\"old\\\", oldHead.Hash(), \\\"oldnum\\\", oldNum, \\\"new\\\", newHead.Hash(), \\\"newnum\\\", newNum)
return
}
// If the reorg ended up on a lower number, it\\\'s indicative of setHead being the cause
log.Debug(\\\"Skipping transaction reset caused by setHead\\\",
\\\"old\\\", oldHead.Hash(), \\\"oldnum\\\", oldNum, \\\"new\\\", newHead.Hash(), \\\"newnum\\\", newNum)
// We still need to update the current state s.th. the lost transactions can be readded by the user
} else {
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error(\\\"Unrooted old chain seen by tx pool\\\", \\\"block\\\", oldHead.Number, \\\"hash\\\", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error(\\\"Unrooted new chain seen by tx pool\\\", \\\"block\\\", newHead.Number, \\\"hash\\\", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error(\\\"Unrooted old chain seen by tx pool\\\", \\\"block\\\", oldHead.Number, \\\"hash\\\", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error(\\\"Unrooted new chain seen by tx pool\\\", \\\"block\\\", newHead.Number, \\\"hash\\\", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
}
}
之后设置最新的世界状态、设置新链头区块的状态,然后把旧链回退的交易放入交易池:
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error(\\\"Failed to reset txpool state\\\", \\\"err\\\", err)
return
}
pool.currentState = statedb
pool.pendingNonces = newTxNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit
// Inject any transactions discarded due to reorgs
log.Debug(\\\"Reinjecting stale transactions\\\", \\\"count\\\", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul = pool.chainconfig.IsIstanbul(next)
pool.eip2718 = pool.chainconfig.IsBerlin(next)
文末小结
参考链接
@KS_Blockchainhttps://twitter.com/KS_Blockchain
原创文章,作者:七芒星实验室,如若转载,请注明出处:https://www.sudun.com/ask/34228.html