1: 背景
由于需要将mongodb的相关链路数据上报至opentelemetry-collector,根据mongo-driver文档说明,初始化Monitor并实现Started,Succeeded,Failed三个接口,即可捕获相关链路数据。
2: 要求
· 线程安全
· 支持设置最大数量缓存,如果达到最大数量后,支持LRU进行淘汰数据
· 支持缓存数据最长存活时间设置,达到时间的缓存数据进行清理
· 支持从缓存中淘汰策略设置
· 支持数据冲突时,更新数据位置以及过期时间
3: 代码实现
package mongodriver
import (
"container/list"
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsontype"
"go.mongodb.org/mongo-driver/event"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
// FIFOCache 并发安全的 FIFO 缓存实现, 具备定时清理缓存功能
type FIFOCache struct {
// 最大保留的元素
MaxEntries int
// 从缓存中淘汰时激活的回调函
OnEvictedCallback func(key Key, value interface{}, reason string, autoFinished bool)
// 使用链表保存先后关系
ll *list.List
cache map[interface{}]*list.Element
lock sync.RWMutex
ttlInMinute int32
done chan struct{}
}
const (
RemoveReasonExpired = "EXPIRED" // 因过期触发清理
RemoveReasonExplicit = "EXPLICIT" // 正常情况清理
RemoveReasonSize = "MAX_SIZE" // 因超过容量触发清理
RemoveReasonCleanAll = "CLEAN_ALL" // 触发清除所有数据
defaultEvictedSpanKey = "evicted" // 触发remove时,写入的tag的key
)
type Key interface{}
type Entry struct {
key Key
value interface{}
ttl time.Time // 数据过期时间
}
// NewFIFOCache 创建一个新缓存,maxEntries 必须大于0.
// ttlInMinute 配置缓存对象的最大存活时间, 单位为minute, 0表示不开启过期功能(不宜配置过低的时间,太低则建议考虑场景是否合适)
func NewFIFOCache(maxEntries int, ttlInMinute int32) *FIFOCache {
if maxEntries < 0 {
panic("maxEntries must be more than 0")
}
cache := &FIFOCache{
MaxEntries: maxEntries,
OnEvictedCallback: func(key Key, value interface{}, reason string, autoFinished bool) {
if !autoFinished {
return
}
span, ok := value.(trace.Span)
if !ok {
return
}
span.SetAttributes(attribute.String(defaultEvictedSpanKey, reason))
span.End()
},
ll: list.New(),
cache: make(map[interface{}]*list.Element, maxEntries),
done: make(chan struct{}),
}
cache.ttlInMinute = ttlInMinute
if ttlInMinute > 0 {
go func() {
for {
select {
case <-cache.done:
println("stop cache goroutine")
return
case <-time.After(time.Second * 10):
cache.cleanInterval(RemoveReasonExpired)
}
}
}()
}
return cache
}
// Set 添加一个元素至缓存,如果存在旧值则用新的覆盖
func (c *FIFOCache) Set(key Key, value interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
if ee, ok := c.cache[key]; ok { // 如果已存在则直接移动到最前
c.ll.MoveToFront(ee)
if e, ok := ee.Value.(*Entry); ok {
e.value = value
if c.ttlInMinute > 0 {
e.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
}
ee.Value = e
}
return
}
item := &Entry{key: key, value: value}
if c.ttlInMinute > 0 {
item.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
}
element := c.ll.PushFront(item)
c.cache[key] = element
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.removeOldest(RemoveReasonSize)
}
}
func (c *FIFOCache) Get(key Key) (value interface{}, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if element, hit := c.cache[key]; hit {
if e, ok := element.Value.(*Entry); ok {
return e.value, true
}
}
return
}
// Remove 删除一个元素, 存在则返回ok为true
// @param key 存储的key
// @param reason 清理原因,具体见定义常量
// @param autoFinish 是否自动上报数据,如果为true,则执行span.End()
func (c *FIFOCache) Remove(key Key, reason string, autoFinish bool) (value interface{}, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if element, hit := c.cache[key]; hit {
c.removeElement(element, reason, autoFinish)
if e, ok := element.Value.(*Entry); ok {
return e.value, ok
}
}
return nil, false
}
func (c *FIFOCache) removeOldest(reason string) {
element := c.ll.Back()
if element != nil {
c.removeElement(element, reason, true)
}
}
func (c *FIFOCache) removeElement(e *list.Element, reason string, autoFinished bool) {
c.ll.Remove(e)
kv, ok := e.Value.(*Entry)
if !ok {
return
}
delete(c.cache, kv.key)
if c.OnEvictedCallback != nil {
c.OnEvictedCallback(kv.key, kv.value, reason, autoFinished)
}
}
func (c *FIFOCache) Len() int {
if c.cache == nil {
return 0
}
return c.ll.Len()
}
// ClearAll 清空所有数据
func (c *FIFOCache) ClearAll() {
c.lock.Lock()
defer c.lock.Unlock()
if c.OnEvictedCallback != nil {
for _, e := range c.cache {
if kv, ok := e.Value.(*Entry); ok {
c.OnEvictedCallback(kv.key, kv.value, RemoveReasonCleanAll, true)
}
}
}
c.ll = list.New()
c.cache = make(map[interface{}]*list.Element)
}
// Close 关闭 goroutines
func (c *FIFOCache) Close() {
c.lock.Lock()
defer c.lock.Unlock()
close(c.done)
}
// 清理过期缓存元素,不宜频繁调用
func (c *FIFOCache) cleanInterval(reason string) {
c.lock.Lock()
defer c.lock.Unlock()
for { // 一直执行移除元素直到遇到不过期的节点
element := c.ll.Back()
if element == nil {
break
}
kv, ok := element.Value.(*Entry)
if ok && !time.Time.IsZero(kv.ttl) && time.Until(kv.ttl) < 0 {
c.removeElement(element, reason, true)
continue // 继续查找过期元素
}
break
}
}
const (
TracerKey = "gitlab.ctyun.cn/apm/apm-sdk/sdk-go/sql/mongodriver"
defaultTTLInMinute int32 = 10 // 最大缓存时间,单位分钟,存活超过10分钟的span的数据将清除
defaultMaxEntries = 10 * 10000 // 最大缓存10w条span数据,大约133MB数据量
)
type SpanKey struct {
ConnectionID string
RequestID int64
}
type monitor struct {
cache *FIFOCache
}
// NewMonitor creates a new mongodb event CommandMonitor.
func NewMonitor() *event.CommandMonitor {
cache := NewFIFOCache(defaultMaxEntries, defaultTTLInMinute)
m := &monitor{cache: cache}
return &event.CommandMonitor{
Started: m.Started,
Succeeded: m.Succeeded,
Failed: m.Failed,
}
}
func (m *monitor) Started(ctx context.Context, evt *event.CommandStartedEvent) {
tracer := otel.GetTracerProvider().Tracer(TracerKey)
var spanName string
hostname, port := m.peerInfo(evt)
attrs := []attribute.KeyValue{
semconv.DBSystemMongoDB,
semconv.DBOperation(evt.CommandName),
semconv.DBName(evt.DatabaseName),
semconv.NetPeerName(hostname),
semconv.NetPeerPort(port),
semconv.NetTransportTCP,
}
if collection, err := m.extractCollection(evt); err == nil && collection != "" {
spanName = collection + "."
attrs = append(attrs, semconv.DBMongoDBCollection(collection))
}
spanName += evt.CommandName
opts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
}
_, span := tracer.Start(ctx, spanName, opts...)
key := SpanKey{
ConnectionID: evt.ConnectionID,
RequestID: evt.RequestID,
}
m.cache.Set(key, span)
}
func (m *monitor) Succeeded(ctx context.Context, evt *event.CommandSucceededEvent) {
m.Finished(ctx, &evt.CommandFinishedEvent, nil)
}
func (m *monitor) Failed(ctx context.Context, evt *event.CommandFailedEvent) {
m.Finished(ctx, &evt.CommandFinishedEvent, fmt.Errorf("%s", evt.Failure))
}
func (m *monitor) Finished(ctx context.Context, evt *event.CommandFinishedEvent, err error) {
key := SpanKey{
ConnectionID: evt.ConnectionID,
RequestID: evt.RequestID,
}
value, ok := m.cache.Remove(key, RemoveReasonExplicit, false)
if !ok {
return
}
span, ok := value.(trace.Span)
if !ok {
return
}
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
span.End()
}
func (m *monitor) extractCollection(evt *event.CommandStartedEvent) (string, error) {
elt, err := evt.Command.IndexErr(0)
if err != nil {
return "", err
}
if key, err := elt.KeyErr(); err == nil && key == evt.CommandName {
var v bson.RawValue
if v, err = elt.ValueErr(); err != nil || v.Type != bsontype.String {
return "", err
}
return v.StringValue(), nil
}
return "", fmt.Errorf("collection name not found")
}
func (m *monitor) peerInfo(evt *event.CommandStartedEvent) (hostname string, port int) {
hostname = evt.ConnectionID
port = 27017
if idx := strings.IndexByte(hostname, '['); idx >= 0 {
hostname = hostname[:idx]
}
if idx := strings.IndexByte(hostname, ':'); idx >= 0 {
port = func(p int, e error) int { return p }(strconv.Atoi(hostname[idx+1:]))
hostname = hostname[:idx]
}
return hostname, port
}