parca 使用badger 存储元数据,使用frostdb 列存,存储采样数据
简单集成使用说明
frostdb 官方文档提供了一些简单的集成demo,值得学习参考
参考代码pkg/parca/parca.go ,pkg/scrape/manager.go,pkg/scrape/scrape.go 如果需要开启持久化的就需要使用对象存储了,frostdb 支持对象存储进行持久化
因为parca 很多地方参考了prometheus,所以对于frostdb的集成是包装为了一个prometheuts 服务
- 初始化
pkg/parca/parca.go 以及pkg/scrape/manager.go
初始化
col, err := frostdb.New(frostdbOptions...)
if err != nil {
level.Error(logger).Log("msg", "failed to initialize storage", "err", err)
return err
}
if err := col.ReplayWALs(context.Background()); err != nil {
level.Error(logger).Log("msg", "failed to replay WAL", "err", err)
return err
}
colDB, err := col.DB(ctx, "parca")
if err != nil {
level.Error(logger).Log("msg", "failed to load database", "err", err)
return err
}
schema, err := parcacol.Schema()
if err != nil {
level.Error(logger).Log("msg", "failed to get schema", "err", err)
return err
}
table, err := colDB.Table("stacktraces", frostdb.NewTableConfig(schema))
if err != nil {
level.Error(logger).Log("msg", "create table", "err", err)
return err
}
s := profilestore.NewProfileColumnStore(
logger,
tracerProvider.Tracer("profilestore"),
metastore,
table,
schema,
flags.StorageDebugValueLog,
)
使用Manager 中使用
func (m *Manager) reload() {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
var wg sync.WaitGroup
level.Debug(m.logger).Log("msg", "Reloading scrape manager")
for setName, groups := range m.targetSets {
var sp *scrapePool
existing, ok := m.scrapePools[setName]
if !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
return
}
// 此方法是核心,进行pull 模式需要依赖列存,具体数据写入参考数据写入部分
sp = newScrapePool(scrapeConfig, m.store, log.With(m.logger, "scrape_pool", setName), m.externalLabels, &scrapePoolMetrics{
targetIntervalLength: m.targetIntervalLength,
targetReloadIntervalLength: m.targetReloadIntervalLength,
targetSyncIntervalLength: m.targetSyncIntervalLength,
targetScrapePoolSyncsCounter: m.targetScrapePoolSyncsCounter,
targetScrapeSampleLimit: m.targetScrapeSampleLimit,
targetScrapeSampleDuplicate: m.targetScrapeSampleDuplicate,
targetScrapeSampleOutOfOrder: m.targetScrapeSampleOutOfOrder,
targetScrapeSampleOutOfBounds: m.targetScrapeSampleOutOfBounds,
})
m.scrapePools[setName] = sp
else {
sp = existing
}
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
sp, groups)
}
wg.Wait()
}
- 数据写入操作
pkg/scrape/scrape.go
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
select {
case <-time.After(sl.scraper.offset(interval)):
// Continue after a scraping offset.
case <-sl.scrapeCtx.Done():
close(sl.stopped)
return
}
var last time.Time
ticker := time.NewTicker(interval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.scrapeCtx.Done():
break mainLoop
default:
}
start := time.Now()
// Only record after the first scrape.
if !last.IsZero() {
sl.intervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)
var profileType string
for _, l := range sl.target.labels {
if l.Name == ProfileName {
profileType = l.Value
break
}
}
scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout)
scrapeErr := sl.scraper.scrape(scrapeCtx, buf, profileType)
cancel()
if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
tl := sl.target.Labels()
tl = append(tl, labels.Label{Name: "__name__", Value: profileType})
for _, l := range sl.externalLabels {
tl = append(tl, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
level.Debug(sl.l).Log("msg", "appending new sample", "labels", tl.String())
protolbls := &profilepb.LabelSet{
Labels: []*profilepb.Label{},
}
for _, l := range tl {
protolbls.Labels = append(protolbls.Labels, &profilepb.Label{
Name: l.Name,
Value: l.Value,
})
}
// 数据写入
_, err := sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{
Tenant: "",
Series: []*profilepb.RawProfileSeries{
{
Labels: protolbls,
Samples: []*profilepb.RawSample{
{
RawProfile: buf.Bytes(),
},
},
},
},
})
if err != nil {
switch errc {
case nil:
level.Error(sl.l).Log("msg", "WriteRaw failed for scraped profile", "err", err)
default:
errc <- err
}
}
sl.target.health = HealthGood
sl.target.lastScrapeDuration = time.Since(start)
sl.target.lastError = nil
else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
sl.target.health = HealthBad
sl.target.lastScrapeDuration = time.Since(start)
sl.target.lastError = scrapeErr
}
sl.buffers.Put(b)
last = start
sl.target.lastScrape = last
select {
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.scrapeCtx.Done():
break mainLoop
case <-ticker.C:
}
}
close(sl.stopped)
}
- 数据查询部分
因为frostdb 对于查询是分离的
pkg/parca/parca.go
q := queryservice.NewColumnQueryAPI(
logger,
tracerProvider.Tracer("query-service"),
sharepb.NewShareClient(conn),
parcacol.NewQuerier(
tracerProvider.Tracer("querier"),
query.NewEngine(
memory.DefaultAllocator,
colDB.TableProvider(),
query.WithTracer(tracerProvider.Tracer("query-engine")),
),
"stacktraces",
metastore,
),
)
查询接口服务
parcaserver := server.NewServer(reg, version)
gr.Add(
func() error {
return parcaserver.ListenAndServe(
ctx,
logger,
flags.Port,
flags.CORSAllowedOrigins,
flags.PathPrefix,
server.RegisterableFunc(func(ctx context.Context, srv *grpc.Server, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error {
debuginfopb.RegisterDebugInfoServiceServer(srv, dbgInfo)
profilestorepb.RegisterProfileStoreServiceServer(srv, s)
querypb.RegisterQueryServiceServer(srv, q)
scrapepb.RegisterScrapeServiceServer(srv, m)
if err := debuginfopb.RegisterDebugInfoServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}
if err := profilestorepb.RegisterProfileStoreServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}
if err := querypb.RegisterQueryServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}
if err := scrapepb.RegisterScrapeServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
return err
}
return nil
}),
)
},
func(_ error) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second) // TODO make this a graceful shutdown config setting
defer cancel()
level.Debug(logger).Log("msg", "server shutting down")
err := parcaserver.Shutdown(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
level.Error(logger).Log("msg", "error shutting down server", "err", err)
}
// Close the columnstore after the parcaserver has shutdown to ensure no more writes occur against it.
if err := col.Close(); err != nil {
level.Error(logger).Log("msg", "error closing columnstore", "err", err)
}
},
)
说明
因为frostdb包装的比较方便,parca 对于frostdb 的使用没有太多复杂的东西,基本是直接使用,frostdb 内部机制还是值得学习的
参考资料
https://github.com/polarsignals/frostdb
https://www.parca.dev/docs/storage
https://github.com/polarsignals/frostdb/blob/main/examples/simple.go