searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

基于LRU算法实现mongodb的monitor中间件上报trace数据

2023-05-23 10:45:58
30
0

1: 背景

     由于需要将mongodb的相关链路数据上报至opentelemetry-collector,根据mongo-driver文档说明,初始化Monitor并实现Started,Succeeded,Failed三个接口,即可捕获相关链路数据。

2: 要求

     · 线程安全

     · 支持设置最大数量缓存,如果达到最大数量后,支持LRU进行淘汰数据

     · 支持缓存数据最长存活时间设置,达到时间的缓存数据进行清理

     · 支持从缓存中淘汰策略设置

     · 支持数据冲突时,更新数据位置以及过期时间

3: 代码实现

package mongodriver

import (
	"container/list"
	"context"
	"fmt"
	"strconv"
	"strings"
	"sync"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/bsontype"
	"go.mongodb.org/mongo-driver/event"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/codes"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
)

// FIFOCache 并发安全的 FIFO 缓存实现, 具备定时清理缓存功能
type FIFOCache struct {
	// 最大保留的元素
	MaxEntries int
	// 从缓存中淘汰时激活的回调函
	OnEvictedCallback func(key Key, value interface{}, reason string, autoFinished bool)
	// 使用链表保存先后关系
	ll    *list.List
	cache map[interface{}]*list.Element

	lock        sync.RWMutex
	ttlInMinute int32
	done        chan struct{}
}

const (
	RemoveReasonExpired  = "EXPIRED"   // 因过期触发清理
	RemoveReasonExplicit = "EXPLICIT"  // 正常情况清理
	RemoveReasonSize     = "MAX_SIZE"  // 因超过容量触发清理
	RemoveReasonCleanAll = "CLEAN_ALL" // 触发清除所有数据

	defaultEvictedSpanKey = "evicted" // 触发remove时,写入的tag的key
)

type Key interface{}
type Entry struct {
	key   Key
	value interface{}
	ttl   time.Time // 数据过期时间
}

// NewFIFOCache 创建一个新缓存,maxEntries 必须大于0.
// ttlInMinute 配置缓存对象的最大存活时间, 单位为minute, 0表示不开启过期功能(不宜配置过低的时间,太低则建议考虑场景是否合适)
func NewFIFOCache(maxEntries int, ttlInMinute int32) *FIFOCache {
	if maxEntries < 0 {
		panic("maxEntries must be more than 0")
	}

	cache := &FIFOCache{
		MaxEntries: maxEntries,
		OnEvictedCallback: func(key Key, value interface{}, reason string, autoFinished bool) {
			if !autoFinished {
				return
			}
			span, ok := value.(trace.Span)
			if !ok {
				return
			}
			span.SetAttributes(attribute.String(defaultEvictedSpanKey, reason))
			span.End()
		},
		ll:    list.New(),
		cache: make(map[interface{}]*list.Element, maxEntries),
		done:  make(chan struct{}),
	}
	cache.ttlInMinute = ttlInMinute
	if ttlInMinute > 0 {
		go func() {
			for {
				select {
				case <-cache.done:
					println("stop cache goroutine")
					return
				case <-time.After(time.Second * 10):
					cache.cleanInterval(RemoveReasonExpired)
				}
			}
		}()
	}

	return cache
}

// Set 添加一个元素至缓存,如果存在旧值则用新的覆盖
func (c *FIFOCache) Set(key Key, value interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if ee, ok := c.cache[key]; ok { // 如果已存在则直接移动到最前
		c.ll.MoveToFront(ee)
		if e, ok := ee.Value.(*Entry); ok {
			e.value = value
			if c.ttlInMinute > 0 {
				e.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
			}
			ee.Value = e
		}
		return
	}

	item := &Entry{key: key, value: value}
	if c.ttlInMinute > 0 {
		item.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
	}

	element := c.ll.PushFront(item)
	c.cache[key] = element
	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
		c.removeOldest(RemoveReasonSize)
	}
}

func (c *FIFOCache) Get(key Key) (value interface{}, ok bool) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if element, hit := c.cache[key]; hit {
		if e, ok := element.Value.(*Entry); ok {
			return e.value, true
		}
	}
	return
}

// Remove 删除一个元素, 存在则返回ok为true
// @param key 存储的key
// @param reason 清理原因,具体见定义常量
// @param autoFinish 是否自动上报数据,如果为true,则执行span.End()
func (c *FIFOCache) Remove(key Key, reason string, autoFinish bool) (value interface{}, ok bool) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if element, hit := c.cache[key]; hit {
		c.removeElement(element, reason, autoFinish)
		if e, ok := element.Value.(*Entry); ok {
			return e.value, ok
		}
	}
	return nil, false
}

func (c *FIFOCache) removeOldest(reason string) {
	element := c.ll.Back()
	if element != nil {
		c.removeElement(element, reason, true)
	}
}

func (c *FIFOCache) removeElement(e *list.Element, reason string, autoFinished bool) {
	c.ll.Remove(e)
	kv, ok := e.Value.(*Entry)
	if !ok {
		return
	}
	delete(c.cache, kv.key)
	if c.OnEvictedCallback != nil {
		c.OnEvictedCallback(kv.key, kv.value, reason, autoFinished)
	}
}

func (c *FIFOCache) Len() int {
	if c.cache == nil {
		return 0
	}
	return c.ll.Len()
}

// ClearAll 清空所有数据
func (c *FIFOCache) ClearAll() {
	c.lock.Lock()
	defer c.lock.Unlock()

	if c.OnEvictedCallback != nil {
		for _, e := range c.cache {
			if kv, ok := e.Value.(*Entry); ok {
				c.OnEvictedCallback(kv.key, kv.value, RemoveReasonCleanAll, true)
			}
		}
	}
	c.ll = list.New()
	c.cache = make(map[interface{}]*list.Element)
}

// Close 关闭 goroutines
func (c *FIFOCache) Close() {
	c.lock.Lock()
	defer c.lock.Unlock()
	close(c.done)
}

// 清理过期缓存元素,不宜频繁调用
func (c *FIFOCache) cleanInterval(reason string) {
	c.lock.Lock()
	defer c.lock.Unlock()

	for { // 一直执行移除元素直到遇到不过期的节点
		element := c.ll.Back()
		if element == nil {
			break
		}

		kv, ok := element.Value.(*Entry)
		if ok && !time.Time.IsZero(kv.ttl) && time.Until(kv.ttl) < 0 {
			c.removeElement(element, reason, true)
			continue // 继续查找过期元素
		}
		break
	}

}

const (
	TracerKey                = "gitlab.ctyun.cn/apm/apm-sdk/sdk-go/sql/mongodriver"
	defaultTTLInMinute int32 = 10         // 最大缓存时间,单位分钟,存活超过10分钟的span的数据将清除
	defaultMaxEntries        = 10 * 10000 // 最大缓存10w条span数据,大约133MB数据量
)

type SpanKey struct {
	ConnectionID string
	RequestID    int64
}

type monitor struct {
	cache *FIFOCache
}

// NewMonitor creates a new mongodb event CommandMonitor.
func NewMonitor() *event.CommandMonitor {
	cache := NewFIFOCache(defaultMaxEntries, defaultTTLInMinute)
	m := &monitor{cache: cache}
	return &event.CommandMonitor{
		Started:   m.Started,
		Succeeded: m.Succeeded,
		Failed:    m.Failed,
	}
}

func (m *monitor) Started(ctx context.Context, evt *event.CommandStartedEvent) {
	tracer := otel.GetTracerProvider().Tracer(TracerKey)
	var spanName string
	hostname, port := m.peerInfo(evt)
	attrs := []attribute.KeyValue{
		semconv.DBSystemMongoDB,
		semconv.DBOperation(evt.CommandName),
		semconv.DBName(evt.DatabaseName),
		semconv.NetPeerName(hostname),
		semconv.NetPeerPort(port),
		semconv.NetTransportTCP,
	}
	if collection, err := m.extractCollection(evt); err == nil && collection != "" {
		spanName = collection + "."
		attrs = append(attrs, semconv.DBMongoDBCollection(collection))
	}
	spanName += evt.CommandName
	opts := []trace.SpanStartOption{
		trace.WithSpanKind(trace.SpanKindClient),
		trace.WithAttributes(attrs...),
	}
	_, span := tracer.Start(ctx, spanName, opts...)
	key := SpanKey{
		ConnectionID: evt.ConnectionID,
		RequestID:    evt.RequestID,
	}
	m.cache.Set(key, span)
}

func (m *monitor) Succeeded(ctx context.Context, evt *event.CommandSucceededEvent) {
	m.Finished(ctx, &evt.CommandFinishedEvent, nil)
}

func (m *monitor) Failed(ctx context.Context, evt *event.CommandFailedEvent) {
	m.Finished(ctx, &evt.CommandFinishedEvent, fmt.Errorf("%s", evt.Failure))
}

func (m *monitor) Finished(ctx context.Context, evt *event.CommandFinishedEvent, err error) {
	key := SpanKey{
		ConnectionID: evt.ConnectionID,
		RequestID:    evt.RequestID,
	}

	value, ok := m.cache.Remove(key, RemoveReasonExplicit, false)
	if !ok {
		return
	}
	span, ok := value.(trace.Span)
	if !ok {
		return
	}

	if err != nil {
		span.SetStatus(codes.Error, err.Error())
	}
	span.End()
}

func (m *monitor) extractCollection(evt *event.CommandStartedEvent) (string, error) {
	elt, err := evt.Command.IndexErr(0)
	if err != nil {
		return "", err
	}
	if key, err := elt.KeyErr(); err == nil && key == evt.CommandName {
		var v bson.RawValue
		if v, err = elt.ValueErr(); err != nil || v.Type != bsontype.String {
			return "", err
		}
		return v.StringValue(), nil
	}
	return "", fmt.Errorf("collection name not found")
}

func (m *monitor) peerInfo(evt *event.CommandStartedEvent) (hostname string, port int) {
	hostname = evt.ConnectionID
	port = 27017
	if idx := strings.IndexByte(hostname, '['); idx >= 0 {
		hostname = hostname[:idx]
	}
	if idx := strings.IndexByte(hostname, ':'); idx >= 0 {
		port = func(p int, e error) int { return p }(strconv.Atoi(hostname[idx+1:]))
		hostname = hostname[:idx]
	}
	return hostname, port
}

     

0条评论
0 / 1000
songziyang
6文章数
0粉丝数
songziyang
6 文章 | 0 粉丝
原创

基于LRU算法实现mongodb的monitor中间件上报trace数据

2023-05-23 10:45:58
30
0

1: 背景

     由于需要将mongodb的相关链路数据上报至opentelemetry-collector,根据mongo-driver文档说明,初始化Monitor并实现Started,Succeeded,Failed三个接口,即可捕获相关链路数据。

2: 要求

     · 线程安全

     · 支持设置最大数量缓存,如果达到最大数量后,支持LRU进行淘汰数据

     · 支持缓存数据最长存活时间设置,达到时间的缓存数据进行清理

     · 支持从缓存中淘汰策略设置

     · 支持数据冲突时,更新数据位置以及过期时间

3: 代码实现

package mongodriver

import (
	"container/list"
	"context"
	"fmt"
	"strconv"
	"strings"
	"sync"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/bsontype"
	"go.mongodb.org/mongo-driver/event"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/codes"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
)

// FIFOCache 并发安全的 FIFO 缓存实现, 具备定时清理缓存功能
type FIFOCache struct {
	// 最大保留的元素
	MaxEntries int
	// 从缓存中淘汰时激活的回调函
	OnEvictedCallback func(key Key, value interface{}, reason string, autoFinished bool)
	// 使用链表保存先后关系
	ll    *list.List
	cache map[interface{}]*list.Element

	lock        sync.RWMutex
	ttlInMinute int32
	done        chan struct{}
}

const (
	RemoveReasonExpired  = "EXPIRED"   // 因过期触发清理
	RemoveReasonExplicit = "EXPLICIT"  // 正常情况清理
	RemoveReasonSize     = "MAX_SIZE"  // 因超过容量触发清理
	RemoveReasonCleanAll = "CLEAN_ALL" // 触发清除所有数据

	defaultEvictedSpanKey = "evicted" // 触发remove时,写入的tag的key
)

type Key interface{}
type Entry struct {
	key   Key
	value interface{}
	ttl   time.Time // 数据过期时间
}

// NewFIFOCache 创建一个新缓存,maxEntries 必须大于0.
// ttlInMinute 配置缓存对象的最大存活时间, 单位为minute, 0表示不开启过期功能(不宜配置过低的时间,太低则建议考虑场景是否合适)
func NewFIFOCache(maxEntries int, ttlInMinute int32) *FIFOCache {
	if maxEntries < 0 {
		panic("maxEntries must be more than 0")
	}

	cache := &FIFOCache{
		MaxEntries: maxEntries,
		OnEvictedCallback: func(key Key, value interface{}, reason string, autoFinished bool) {
			if !autoFinished {
				return
			}
			span, ok := value.(trace.Span)
			if !ok {
				return
			}
			span.SetAttributes(attribute.String(defaultEvictedSpanKey, reason))
			span.End()
		},
		ll:    list.New(),
		cache: make(map[interface{}]*list.Element, maxEntries),
		done:  make(chan struct{}),
	}
	cache.ttlInMinute = ttlInMinute
	if ttlInMinute > 0 {
		go func() {
			for {
				select {
				case <-cache.done:
					println("stop cache goroutine")
					return
				case <-time.After(time.Second * 10):
					cache.cleanInterval(RemoveReasonExpired)
				}
			}
		}()
	}

	return cache
}

// Set 添加一个元素至缓存,如果存在旧值则用新的覆盖
func (c *FIFOCache) Set(key Key, value interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if ee, ok := c.cache[key]; ok { // 如果已存在则直接移动到最前
		c.ll.MoveToFront(ee)
		if e, ok := ee.Value.(*Entry); ok {
			e.value = value
			if c.ttlInMinute > 0 {
				e.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
			}
			ee.Value = e
		}
		return
	}

	item := &Entry{key: key, value: value}
	if c.ttlInMinute > 0 {
		item.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
	}

	element := c.ll.PushFront(item)
	c.cache[key] = element
	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
		c.removeOldest(RemoveReasonSize)
	}
}

func (c *FIFOCache) Get(key Key) (value interface{}, ok bool) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if element, hit := c.cache[key]; hit {
		if e, ok := element.Value.(*Entry); ok {
			return e.value, true
		}
	}
	return
}

// Remove 删除一个元素, 存在则返回ok为true
// @param key 存储的key
// @param reason 清理原因,具体见定义常量
// @param autoFinish 是否自动上报数据,如果为true,则执行span.End()
func (c *FIFOCache) Remove(key Key, reason string, autoFinish bool) (value interface{}, ok bool) {
	c.lock.Lock()
	defer c.lock.Unlock()

	if element, hit := c.cache[key]; hit {
		c.removeElement(element, reason, autoFinish)
		if e, ok := element.Value.(*Entry); ok {
			return e.value, ok
		}
	}
	return nil, false
}

func (c *FIFOCache) removeOldest(reason string) {
	element := c.ll.Back()
	if element != nil {
		c.removeElement(element, reason, true)
	}
}

func (c *FIFOCache) removeElement(e *list.Element, reason string, autoFinished bool) {
	c.ll.Remove(e)
	kv, ok := e.Value.(*Entry)
	if !ok {
		return
	}
	delete(c.cache, kv.key)
	if c.OnEvictedCallback != nil {
		c.OnEvictedCallback(kv.key, kv.value, reason, autoFinished)
	}
}

func (c *FIFOCache) Len() int {
	if c.cache == nil {
		return 0
	}
	return c.ll.Len()
}

// ClearAll 清空所有数据
func (c *FIFOCache) ClearAll() {
	c.lock.Lock()
	defer c.lock.Unlock()

	if c.OnEvictedCallback != nil {
		for _, e := range c.cache {
			if kv, ok := e.Value.(*Entry); ok {
				c.OnEvictedCallback(kv.key, kv.value, RemoveReasonCleanAll, true)
			}
		}
	}
	c.ll = list.New()
	c.cache = make(map[interface{}]*list.Element)
}

// Close 关闭 goroutines
func (c *FIFOCache) Close() {
	c.lock.Lock()
	defer c.lock.Unlock()
	close(c.done)
}

// 清理过期缓存元素,不宜频繁调用
func (c *FIFOCache) cleanInterval(reason string) {
	c.lock.Lock()
	defer c.lock.Unlock()

	for { // 一直执行移除元素直到遇到不过期的节点
		element := c.ll.Back()
		if element == nil {
			break
		}

		kv, ok := element.Value.(*Entry)
		if ok && !time.Time.IsZero(kv.ttl) && time.Until(kv.ttl) < 0 {
			c.removeElement(element, reason, true)
			continue // 继续查找过期元素
		}
		break
	}

}

const (
	TracerKey                = "gitlab.ctyun.cn/apm/apm-sdk/sdk-go/sql/mongodriver"
	defaultTTLInMinute int32 = 10         // 最大缓存时间,单位分钟,存活超过10分钟的span的数据将清除
	defaultMaxEntries        = 10 * 10000 // 最大缓存10w条span数据,大约133MB数据量
)

type SpanKey struct {
	ConnectionID string
	RequestID    int64
}

type monitor struct {
	cache *FIFOCache
}

// NewMonitor creates a new mongodb event CommandMonitor.
func NewMonitor() *event.CommandMonitor {
	cache := NewFIFOCache(defaultMaxEntries, defaultTTLInMinute)
	m := &monitor{cache: cache}
	return &event.CommandMonitor{
		Started:   m.Started,
		Succeeded: m.Succeeded,
		Failed:    m.Failed,
	}
}

func (m *monitor) Started(ctx context.Context, evt *event.CommandStartedEvent) {
	tracer := otel.GetTracerProvider().Tracer(TracerKey)
	var spanName string
	hostname, port := m.peerInfo(evt)
	attrs := []attribute.KeyValue{
		semconv.DBSystemMongoDB,
		semconv.DBOperation(evt.CommandName),
		semconv.DBName(evt.DatabaseName),
		semconv.NetPeerName(hostname),
		semconv.NetPeerPort(port),
		semconv.NetTransportTCP,
	}
	if collection, err := m.extractCollection(evt); err == nil && collection != "" {
		spanName = collection + "."
		attrs = append(attrs, semconv.DBMongoDBCollection(collection))
	}
	spanName += evt.CommandName
	opts := []trace.SpanStartOption{
		trace.WithSpanKind(trace.SpanKindClient),
		trace.WithAttributes(attrs...),
	}
	_, span := tracer.Start(ctx, spanName, opts...)
	key := SpanKey{
		ConnectionID: evt.ConnectionID,
		RequestID:    evt.RequestID,
	}
	m.cache.Set(key, span)
}

func (m *monitor) Succeeded(ctx context.Context, evt *event.CommandSucceededEvent) {
	m.Finished(ctx, &evt.CommandFinishedEvent, nil)
}

func (m *monitor) Failed(ctx context.Context, evt *event.CommandFailedEvent) {
	m.Finished(ctx, &evt.CommandFinishedEvent, fmt.Errorf("%s", evt.Failure))
}

func (m *monitor) Finished(ctx context.Context, evt *event.CommandFinishedEvent, err error) {
	key := SpanKey{
		ConnectionID: evt.ConnectionID,
		RequestID:    evt.RequestID,
	}

	value, ok := m.cache.Remove(key, RemoveReasonExplicit, false)
	if !ok {
		return
	}
	span, ok := value.(trace.Span)
	if !ok {
		return
	}

	if err != nil {
		span.SetStatus(codes.Error, err.Error())
	}
	span.End()
}

func (m *monitor) extractCollection(evt *event.CommandStartedEvent) (string, error) {
	elt, err := evt.Command.IndexErr(0)
	if err != nil {
		return "", err
	}
	if key, err := elt.KeyErr(); err == nil && key == evt.CommandName {
		var v bson.RawValue
		if v, err = elt.ValueErr(); err != nil || v.Type != bsontype.String {
			return "", err
		}
		return v.StringValue(), nil
	}
	return "", fmt.Errorf("collection name not found")
}

func (m *monitor) peerInfo(evt *event.CommandStartedEvent) (hostname string, port int) {
	hostname = evt.ConnectionID
	port = 27017
	if idx := strings.IndexByte(hostname, '['); idx >= 0 {
		hostname = hostname[:idx]
	}
	if idx := strings.IndexByte(hostname, ':'); idx >= 0 {
		port = func(p int, e error) int { return p }(strconv.Atoi(hostname[idx+1:]))
		hostname = hostname[:idx]
	}
	return hostname, port
}

     

文章来自个人专栏
AI多面手
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0