摘要:
influxdb接口可以创建用户, 但是内部直接将密码设置为[REDACTED], 导致需要额外处理.
创建用户命令:
# 显示用户
SHOW USERS
# 创建用户
CREATE USER "username" WITH PASSWORD 'password'
# 创建管理员权限的用户
CREATE USER "username" WITH PASSWORD 'password' WITH ALL PRIVILEGES
# 删除用户
DROP USER "username"
核心处理:
// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) {
atomic.AddInt64(&h.stats.QueryRequests, 1)
defer func(start time.Time) {
atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())
h.requestTracker.Add(r, user)
// Retrieve the underlying ResponseWriter or initialize our own.
rw, ok := w.(ResponseWriter)
if !ok {
rw = NewResponseWriter(w, r)
}
// Retrieve the node id the query should be executed on.
nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64)
var qr io.Reader
// Attempt to read the form value from the "q" form value.
if qp := strings.TrimSpace(r.FormValue("q")); qp != "" {
qr = strings.NewReader(qp)
} else if r.MultipartForm != nil && r.MultipartForm.File != nil {
// If we have a multipart/form-data, try to retrieve a file from 'q'.
if fhs := r.MultipartForm.File["q"]; len(fhs) > 0 {
f, err := fhs[0].Open()
if err != nil {
h.httpError(rw, err.Error(), http.StatusBadRequest)
return
}
defer f.Close()
qr = f
}
}
if qr == nil {
h.httpError(rw, `missing required parameter "q"`, http.StatusBadRequest)
return
}
epoch := strings.TrimSpace(r.FormValue("epoch"))
p := influxql.NewParser(qr)
db := r.FormValue("db")
query_ori_addr := make([]string, len(r.URL.Query()))
{
values := r.URL.Query()
for i, q := range values["q"] {
h.Logger.Info(fmt.Sprintf("QUERY i: %d, q: %s", i, q))
query_ori_addr[i] = q + ";"
}
}
// Sanitize the request query params so it doesn't show up in the response logger.
// Do this before anything else so a parsing error doesn't leak passwords.
sanitize(r)
// Parse the parameters
rawParams := r.FormValue("params")
if rawParams != "" {
var params map[string]interface{}
decoder := json.NewDecoder(strings.NewReader(rawParams))
decoder.UseNumber()
if err := decoder.Decode(¶ms); err != nil {
h.httpError(rw, "error parsing query parameters: "+err.Error(), http.StatusBadRequest)
return
}
// Convert json.Number into int64 and float64 values
for k, v := range params {
if v, ok := v.(json.Number); ok {
var err error
if strings.Contains(string(v), ".") {
params[k], err = v.Float64()
} else {
params[k], err = v.Int64()
}
if err != nil {
h.httpError(rw, "error parsing json value: "+err.Error(), http.StatusBadRequest)
return
}
}
}
p.SetParams(params)
}
// Parse query from qry string.
q, err := p.ParseQuery()
if err != nil {
h.httpError(rw, "error parsing query: "+err.Error(), http.StatusBadRequest)
return
}
// Check authorization.
if h.Config.AuthEnabled {
if err := h.QueryAuthorizer.AuthorizeQuery(user, q, db); err != nil {
if err, ok := err.(meta.ErrAuthorize); ok {
h.Logger.Info("Unauthorized request",
zap.String("user", err.User),
zap.Stringer("query", err.Query),
logger.Database(err.Database))
}
h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden)
return
}
}
// Parse chunk size. Use default if not provided or unparsable.
chunked := r.FormValue("chunked") == "true"
chunkSize := DefaultChunkSize
if chunked {
if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 {
chunkSize = int(n)
}
}
// Parse whether this is an async command.
async := r.FormValue("async") == "true"
opts := query.ExecutionOptions{
Database: db,
RetentionPolicy: r.FormValue("rp"),
ChunkSize: chunkSize,
ReadOnly: r.Method == "GET",
NodeID: nodeID,
}
if h.Config.AuthEnabled {
if user != nil && user.AuthorizeUnrestricted() {
opts.Authorizer = query.OpenAuthorizer
} else {
// The current user determines the authorized actions.
opts.Authorizer = user
}
} else {
// Auth is disabled, so allow everything.
opts.Authorizer = query.OpenAuthorizer
}
// Make sure if the client disconnects we signal the query to abort
var closing chan struct{}
if !async {
closing = make(chan struct{})
if notifier, ok := w.(http.CloseNotifier); ok {
// CloseNotify() is not guaranteed to send a notification when the query
// is closed. Use this channel to signal that the query is finished to
// prevent lingering goroutines that may be stuck.
done := make(chan struct{})
defer close(done)
notify := notifier.CloseNotify()
go func() {
// Wait for either the request to finish
// or for the client to disconnect
select {
case <-done:
case <-notify:
close(closing)
}
}()
opts.AbortCh = done
} else {
defer close(closing)
}
}
// Execute query.
results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
{
for _, qry := range query_ori_addr {
if "" == qry {
continue
}
qry_arr := strings.Fields(qry)
if 1 < len(qry_arr) {
qry_type := string(qry_arr[0])
runHaRaft := true
if ("SHOW" == qry_type) ||
("show" == qry_type) ||
("SELECT" == qry_type) ||
("select" == qry_type) {
runHaRaft = false
}
if runHaRaft {
go h.ServeQueryHaRaft(qry, user, opts)
}
}
}
}
// If we are running in async mode, open a goroutine to drain the results
// and return with a StatusNoContent.
if async {
go h.async(q, results)
h.writeHeader(w, http.StatusNoContent)
return
}
// if we're not chunking, this will be the in memory buffer for all results before sending to client
resp := Response{Results: make([]*query.Result, 0)}
// Status header is OK once this point is reached.
// Attempt to flush the header immediately so the client gets the header information
// and knows the query was accepted.
h.writeHeader(rw, http.StatusOK)
if w, ok := w.(http.Flusher); ok {
w.Flush()
}
// pull all results from the channel
rows := 0
for r := range results {
// Ignore nil results.
if r == nil {
continue
}
// if requested, convert result timestamps to epoch
if epoch != "" {
convertToEpoch(r, epoch)
}
// Write out result immediately if chunked.
if chunked {
n, _ := rw.WriteResponse(Response{
Results: []*query.Result{r},
})
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
w.(http.Flusher).Flush()
continue
}
// Limit the number of rows that can be returned in a non-chunked
// response. This is to prevent the server from going OOM when
// returning a large response. If you want to return more than the
// default chunk size, then use chunking to process multiple blobs.
// Iterate through the series in this result to count the rows and
// truncate any rows we shouldn't return.
if h.Config.MaxRowLimit > 0 {
for i, series := range r.Series {
n := h.Config.MaxRowLimit - rows
if n < len(series.Values) {
// We have reached the maximum number of values. Truncate
// the values within this row.
series.Values = series.Values[:n]
// Since this was truncated, it will always be a partial return.
// Add this so the client knows we truncated the response.
series.Partial = true
}
rows += len(series.Values)
if rows >= h.Config.MaxRowLimit {
// Drop any remaining series since we have already reached the row limit.
if i < len(r.Series) {
r.Series = r.Series[:i+1]
}
break
}
}
}
// It's not chunked so buffer results in memory.
// Results for statements need to be combined together.
// We need to check if this new result is for the same statement as
// the last result, or for the next statement
l := len(resp.Results)
if l == 0 {
resp.Results = append(resp.Results, r)
} else if resp.Results[l-1].StatementID == r.StatementID {
if r.Err != nil {
resp.Results[l-1] = r
continue
}
cr := resp.Results[l-1]
rowsMerged := 0
if len(cr.Series) > 0 {
lastSeries := cr.Series[len(cr.Series)-1]
for _, row := range r.Series {
if !lastSeries.SameSeries(row) {
// Next row is for a different series than last.
break
}
// Values are for the same series, so append them.
lastSeries.Values = append(lastSeries.Values, row.Values...)
rowsMerged++
}
}
// Append remaining rows as new rows.
r.Series = r.Series[rowsMerged:]
cr.Series = append(cr.Series, r.Series...)
cr.Messages = append(cr.Messages, r.Messages...)
cr.Partial = r.Partial
} else {
resp.Results = append(resp.Results, r)
}
// Drop out of this loop and do not process further results when we hit the row limit.
if h.Config.MaxRowLimit > 0 && rows >= h.Config.MaxRowLimit {
// If the result is marked as partial, remove that partial marking
// here. While the series is partial and we would normally have
// tried to return the rest in the next chunk, we are not using
// chunking and are truncating the series so we don't want to
// signal to the client that we plan on sending another JSON blob
// with another result. The series, on the other hand, still
// returns partial true if it was truncated or had more data to
// send in a future chunk.
r.Partial = false
break
}
}
// If it's not chunked we buffered everything in memory, so write it out
if !chunked {
n, _ := rw.WriteResponse(resp)
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
}
}
关键点:
query_ori_addr := make([]string, len(r.URL.Query()))
{
values := r.URL.Query()
for i, q := range values["q"] {
h.Logger.Info(fmt.Sprintf("QUERY i: %d, q: %s", i, q))
query_ori_addr[i] = q + ";"
}
}
// Sanitize the request query params so it doesn't show up in the response logger.
// Do this before anything else so a parsing error doesn't leak passwords.
sanitize(r)
// sanitize redacts passwords from query string for logging.
func sanitize(r *http.Request) {
values := r.URL.Query()
for i, q := range values["q"] {
values["q"][i] = influxql.Sanitize(q)
}
r.URL.RawQuery = values.Encode()
}
// Sanitize attempts to sanitize passwords out of a raw query.
// It looks for patterns that may be related to the SET PASSWORD and CREATE USER
// statements and will redact the password that should be there. It will attempt
// to redact information from common invalid queries too, but it's not guaranteed
// to succeed on improper queries.
//
// This function works on the raw query and attempts to retain the original input
// as much as possible.
func Sanitize(query string) string {
if matches := sanitizeSetPassword.FindAllStringSubmatchIndex(query, -1); matches != nil {
var buf bytes.Buffer
i := 0
for _, match := range matches {
buf.WriteString(query[i:match[2]])
buf.WriteString("[REDACTED]")
i = match[3]
}
buf.WriteString(query[i:])
query = buf.String()
}
if matches := sanitizeCreatePassword.FindAllStringSubmatchIndex(query, -1); matches != nil {
var buf bytes.Buffer
i := 0
for _, match := range matches {
buf.WriteString(query[i:match[2]])
buf.WriteString("[REDACTED]")
i = match[3]
}
buf.WriteString(query[i:])
query = buf.String()
}
return query
}
可以看出http报文信息中的密码被替换成了[REDACTED]
解决办法:
在替换为[REDACTED]之前, 将请求保存写入raft日志复制
query_ori_addr := make([]string, len(r.URL.Query()))
{
values := r.URL.Query()
for i, q := range values["q"] {
h.Logger.Info(fmt.Sprintf("QUERY i: %d, q: %s", i, q))
query_ori_addr[i] = q + ";"
}
}
在本节点执行时, 将请求发送给raft日志复制:
// Execute query.
results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
{
for _, qry := range query_ori_addr {
if "" == qry {
continue
}
qry_arr := strings.Fields(qry)
if 1 < len(qry_arr) {
qry_type := string(qry_arr[0])
runHaRaft := true
if ("SHOW" == qry_type) ||
("show" == qry_type) ||
("SELECT" == qry_type) ||
("select" == qry_type) {
runHaRaft = false
}
if runHaRaft {
go h.ServeQueryHaRaft(qry, user, opts)
}
}
}
}