摘要:
记录influxdb存储引擎tsm写数据时候对wal和cache的处理
核心函数:
Engine:WritePoints
// WritePoints writes metadata and point data into the engine.
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []models.Point) error {
values := make(map[string][]Value, len(points))
var (
keyBuf []byte
baseLen int
seriesErr error
)
for _, p := range points {
keyBuf = append(keyBuf[:0], p.Key()...)
keyBuf = append(keyBuf, keyFieldSeparator...)
baseLen = len(keyBuf)
iter := p.FieldIterator()
t := p.Time().UnixNano()
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
if e.seriesTypeMap != nil {
// Fast-path check to see if the field for the series already exists.
if v, ok := e.seriesTypeMap.Get(keyBuf); !ok {
if typ, err := e.Type(keyBuf); err != nil {
// Field type is unknown, we can try to add it.
} else if typ != iter.Type() {
// Existing type is different from what was passed in, we need to drop
// this write and refresh the series type map.
seriesErr = tsdb.ErrFieldTypeConflict
e.seriesTypeMap.Insert(keyBuf, int(typ))
continue
}
// Doesn't exist, so try to insert
vv, ok := e.seriesTypeMap.Insert(keyBuf, int(iter.Type()))
// We didn't insert and the type that exists isn't what we tried to insert, so
// we have a conflict and must drop this field/series.
if !ok || vv != int(iter.Type()) {
seriesErr = tsdb.ErrFieldTypeConflict
continue
}
} else if v != int(iter.Type()) {
// The series already exists, but with a different type. This is also a type conflict
// and we need to drop this field/series.
seriesErr = tsdb.ErrFieldTypeConflict
continue
}
}
var v Value
switch iter.Type() {
case models.Float:
fv, err := iter.FloatValue()
if err != nil {
return err
}
v = NewFloatValue(t, fv)
case models.Integer:
iv, err := iter.IntegerValue()
if err != nil {
return err
}
v = NewIntegerValue(t, iv)
case models.Unsigned:
iv, err := iter.UnsignedValue()
if err != nil {
return err
}
v = NewUnsignedValue(t, iv)
case models.String:
v = NewStringValue(t, iter.StringValue())
case models.Boolean:
bv, err := iter.BooleanValue()
if err != nil {
return err
}
v = NewBooleanValue(t, bv)
default:
return fmt.Errorf("unknown field type for %s: %s", string(iter.FieldKey()), p.String())
}
values[string(keyBuf)] = append(values[string(keyBuf)], v)
}
}
e.mu.RLock()
defer e.mu.RUnlock()
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
return seriesErr
}
Cache:WriteMulti
// WriteMulti writes the map of keys and associated values to the cache. This
// function is goroutine-safe. It returns an error if the cache will exceeded
// its max size by adding the new values. The write attempts to write as many
// values as possible. If one key fails, the others can still succeed and an
// error will be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
c.init()
var addedSize uint64
for _, v := range values {
addedSize += uint64(Values(v).Size())
}
// Enough room in the cache?
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
return ErrCacheMemorySizeLimitExceeded(n, limit)
}
var werr error
c.mu.RLock()
store := c.store
c.mu.RUnlock()
// We'll optimistially set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
for k, v := range values {
newKey, err := store.write([]byte(k), v)
if err != nil {
// The write failed, hold onto the error and adjust the size delta.
werr = err
addedSize -= uint64(Values(v).Size())
c.decreaseSize(uint64(Values(v).Size()))
}
if newKey {
addedSize += uint64(len(k))
c.increaseSize(uint64(len(k)))
}
}
// Some points in the batch were dropped. An error is returned so
// error stat is incremented as well.
if werr != nil {
atomic.AddInt64(&c.stats.WriteDropped, 1)
atomic.AddInt64(&c.stats.WriteErr, 1)
}
// Update the memory size stat
c.updateMemSize(int64(addedSize))
atomic.AddInt64(&c.stats.WriteOK, 1)
c.mu.Lock()
c.lastWriteTime = time.Now()
c.mu.Unlock()
return werr
}
WAL:WriteMulti
// WriteMulti writes the given values to the WAL. It returns the WAL segment ID to
// which the points were written. If an error is returned the segment ID should
// be ignored.
func (l *WAL) WriteMulti(values map[string][]Value) (int, error) {
entry := &WriteWALEntry{
Values: values,
}
id, err := l.writeToLog(entry)
if err != nil {
atomic.AddInt64(&l.stats.WriteErr, 1)
return -1, err
}
atomic.AddInt64(&l.stats.WriteOK, 1)
return id, nil
}
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// limit how many concurrent encodings can be in flight. Since we can only
// write one at a time to disk, a slow disk can cause the allocations below
// to increase quickly. If we're backed up, wait until others have completed.
bytes := bytesPool.Get(entry.MarshalSize())
b, err := entry.Encode(bytes)
if err != nil {
bytesPool.Put(bytes)
return -1, err
}
encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))
compressed := snappy.Encode(encBuf, b)
bytesPool.Put(bytes)
syncErr := make(chan error)
segID, err := func() (int, error) {
l.mu.Lock()
defer l.mu.Unlock()
// Make sure the log has not been closed
select {
case <-l.closing:
return -1, ErrWALClosed
default:
}
// roll the segment file if needed
if err := l.rollSegment(); err != nil {
return -1, fmt.Errorf("error rolling WAL segment: %v", err)
}
// write and sync
if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}
select {
case l.syncWaiters <- syncErr:
default:
return -1, fmt.Errorf("error syncing wal")
}
l.scheduleSync()
// Update stats for current segment size
atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))
l.lastWriteTime = time.Now().UTC()
return l.currentSegmentID, nil
}()
bytesPool.Put(encBuf)
if err != nil {
return segID, err
}
// schedule an fsync and wait for it to complete
return segID, <-syncErr
}
// Write writes entryType and the buffer containing compressed entry data.
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
var buf [5]byte
buf[0] = byte(entryType)
binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))
if _, err := w.bw.Write(buf[:]); err != nil {
return err
}
if _, err := w.bw.Write(compressed); err != nil {
return err
}
w.size += len(buf) + len(compressed)
return nil
}
WAL的序列化处理:
// Encode converts the WriteWALEntry into a byte stream using dst if it
// is large enough. If dst is too small, the slice will be grown to fit the
// encoded entry.
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
// The entries values are encode as follows:
//
// For each key and slice of values, first a 1 byte type for the []Values
// slice is written. Following the type, the length and key bytes are written.
// Following the key, a 4 byte count followed by each value as a 8 byte time
// and N byte value. The value is dependent on the type being encoded. float64,
// int64, use 8 bytes, boolean uses 1 byte, and string is similar to the key encoding,
// except that string values have a 4-byte length, and keys only use 2 bytes.
//
// This structure is then repeated for each key an value slices.
//
// ┌────────────────────────────────────────────────────────────────────┐
// │ WriteWALEntry │
// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
// │ Type │ Key Len │ Key │ Count │ Time │ Value │...│ Type │...│
// │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │ │1 byte│ │
// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘
encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key
// allocate or re-slice to correct size
if len(dst) < encLen {
dst = make([]byte, encLen)
} else {
dst = dst[:encLen]
}
// Finally, encode the entry
var n int
var curType byte
for k, v := range w.Values {
switch v[0].(type) {
case FloatValue:
curType = float64EntryType
case IntegerValue:
curType = integerEntryType
case UnsignedValue:
curType = unsignedEntryType
case BooleanValue:
curType = booleanEntryType
case StringValue:
curType = stringEntryType
default:
return nil, fmt.Errorf("unsupported value type: %T", v[0])
}
dst[n] = curType
n++
binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))
n += 2
n += copy(dst[n:], k)
binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))
n += 4
for _, vv := range v {
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))
n += 8
switch vv := vv.(type) {
case FloatValue:
if curType != float64EntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value))
n += 8
case IntegerValue:
if curType != integerEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
n += 8
case UnsignedValue:
if curType != unsignedEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
n += 8
case BooleanValue:
if curType != booleanEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
if vv.value {
dst[n] = 1
} else {
dst[n] = 0
}
n++
case StringValue:
if curType != stringEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value)))
n += 4
n += copy(dst[n:], vv.value)
default:
return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)
}
}
}
return dst[:n], nil
}
可以看出, 先写cache, 也就是先向内存里写. 然后再向wal文件里写.
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
调用栈:
(gdb) p keyBuf
$1 = {array = 0xc000ab61a0 "cpu#!~#value", len = 12, cap = 16}
(gdb) bt
#0 influxdb.cluster/tsdb/engine/tsm1.(*Engine).WritePoints (e=0xc0006418c0, points=..., ~r1=...) at /root/work/influxdb-1.8.4/tsdb/engine/tsm1/engine.go:1340
#1 0x0000000000d71c3e in influxdb.cluster/tsdb.(*Shard).WritePoints (s=0xc0000cf8c0, points=..., ~r1=...) at /root/work/influxdb-1.8.4/tsdb/shard.go:525
#2 0x0000000000d951ee in influxdb.cluster/tsdb.(*Store).WriteToShard (s=0xc0000f6c00, shardID=7, points=..., ~r2=...) at /root/work/influxdb-1.8.4/tsdb/store.go:1395
#3 0x0000000000db59b9 in influxdb.cluster/coordinator.(*PointsWriter).writeToShard.func1 (shardID=7, owner=..., points=...) at /root/work/influxdb-1.8.4/coordinator/points_writer.go:443
#4 0x0000000000db5712 in influxdb.cluster/coordinator.(*PointsWriter).writeToShard·dwrap·8 () at /root/work/influxdb-1.8.4/coordinator/points_writer.go:470
#5 0x0000000000470ac1 in runtime.goexit () at /usr/local/go/src/runtime/asm_amd64.s:1581
#6 0x0000000000000056 in ?? ()
#7 0x0000000000000011 in ?? ()
#8 0x000000000000005e in ?? ()
#9 0x000000000000000e in ?? ()
#10 0x000000c000e3a000 in ?? ()
#11 0x0000000000000000 in ?? ()
问题:
- 写入cache成功但是写WAL文件失败, 认为本次写失败. 该策略交由客户端去做重试写?