batch_tx.go (etcd-3.5.5) | : | batch_tx.go (etcd-3.5.6) | ||
---|---|---|---|---|
skipping to change at line 332 | skipping to change at line 332 | |||
t.Unlock() | t.Unlock() | |||
} | } | |||
func (t *batchTxBuffered) CommitAndStop() { | func (t *batchTxBuffered) CommitAndStop() { | |||
t.lock() | t.lock() | |||
t.commit(true) | t.commit(true) | |||
t.Unlock() | t.Unlock() | |||
} | } | |||
func (t *batchTxBuffered) commit(stop bool) { | func (t *batchTxBuffered) commit(stop bool) { | |||
if t.backend.hooks != nil { | ||||
t.backend.hooks.OnPreCommitUnsafe(t) | ||||
} | ||||
// all read txs must be closed to acquire boltdb commit rwlock | // all read txs must be closed to acquire boltdb commit rwlock | |||
t.backend.readTx.Lock() | t.backend.readTx.Lock() | |||
t.unsafeCommit(stop) | t.unsafeCommit(stop) | |||
t.backend.readTx.Unlock() | t.backend.readTx.Unlock() | |||
} | } | |||
func (t *batchTxBuffered) unsafeCommit(stop bool) { | func (t *batchTxBuffered) unsafeCommit(stop bool) { | |||
if t.backend.hooks != nil { | ||||
t.backend.hooks.OnPreCommitUnsafe(t) | ||||
} | ||||
if t.backend.readTx.tx != nil { | if t.backend.readTx.tx != nil { | |||
// wait all store read transactions using the current boltdb tx t o finish, | // wait all store read transactions using the current boltdb tx t o finish, | |||
// then close the boltdb tx | // then close the boltdb tx | |||
go func(tx *bolt.Tx, wg *sync.WaitGroup) { | go func(tx *bolt.Tx, wg *sync.WaitGroup) { | |||
wg.Wait() | wg.Wait() | |||
if err := tx.Rollback(); err != nil { | if err := tx.Rollback(); err != nil { | |||
t.backend.lg.Fatal("failed to rollback tx", zap.E rror(err)) | t.backend.lg.Fatal("failed to rollback tx", zap.E rror(err)) | |||
} | } | |||
}(t.backend.readTx.tx, t.backend.readTx.txWg) | }(t.backend.readTx.tx, t.backend.readTx.txWg) | |||
t.backend.readTx.reset() | t.backend.readTx.reset() | |||
End of changes. 2 change blocks. | ||||
4 lines changed or deleted | 3 lines changed or added |