From 76dde35da4a746157d015f69bfb57e0890d3e3f2 Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Tue, 25 Jun 2024 16:48:40 +0800 Subject: [PATCH] server: use elasticsearch search template to query --- server/elasticClient/client.go | 43 ++++++++++++++++++++++----- server/handler/searchHandle.go | 20 +++++-------- server/service/cluster/LogTimeAixs.go | 13 ++++++-- 3 files changed, 52 insertions(+), 24 deletions(-) diff --git a/server/elasticClient/client.go b/server/elasticClient/client.go index a90fe92..871fcaa 100644 --- a/server/elasticClient/client.go +++ b/server/elasticClient/client.go @@ -1,8 +1,10 @@ package elasticClient import ( + "bytes" "context" "crypto/tls" + "encoding/json" "fmt" "io" "net" @@ -17,6 +19,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-elk/global/template" "gitee.com/openeuler/PilotGo-plugin-elk/pluginclient" elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" ) var Global_elastic *ElasticClient_v7 @@ -60,6 +63,7 @@ func InitElasticClient() { Global_elastic.initSearchTemplate() } +// 在elasticsearch中添加查询模板 func (client *ElasticClient_v7) initSearchTemplate() { for key, value := range template.DSL_template_map { reqbody := strings.NewReader(value) @@ -76,7 +80,8 @@ func (client *ElasticClient_v7) initSearchTemplate() { } } -func (client *ElasticClient_v7) SearchByQueryRequestBody(index string, body io.Reader) ([]byte, error) { +// 通过dsl查询 +func (client *ElasticClient_v7) SearchByDsl(index string, body io.Reader) ([]byte, error) { resp, err := client.Client.Search( client.Client.Search.WithContext(client.Ctx), client.Client.Search.WithIndex(index), @@ -84,18 +89,40 @@ func (client *ElasticClient_v7) SearchByQueryRequestBody(index string, body io.R client.Client.Search.WithTrackTotalHits(true), client.Client.Search.WithPretty(), ) + return client.processApiResult(resp, err) +} + +// 通过调用template模板查询 +func (client *ElasticClient_v7) SearchByTemplate(index string, querybody map[string]interface{}) ([]byte, error) { + query_body_bytes, err := json.Marshal(querybody) if err != nil { - err = errors.Errorf("%+v **errstack**0", err.Error()) + err = errors.Errorf("%s **errstack**0", err.Error()) return nil, err } - defer resp.Body.Close() - if resp.IsError() { - err = errors.Errorf("%+v **errstack**0", resp.String()) - return nil, err + query_body_reader := bytes.NewReader(query_body_bytes) + resp, err := client.Client.SearchTemplate( + query_body_reader, + client.Client.SearchTemplate.WithContext(context.Background()), + client.Client.SearchTemplate.WithIndex(index), + client.Client.SearchTemplate.WithPretty(), + ) + return client.processApiResult(resp, err) +} + +// 处理elasticsearch client接口的返回值 +func (client *ElasticClient_v7) processApiResult(_resp *esapi.Response, _err error) ([]byte, error) { + if _err != nil { + _err = errors.Errorf("%+v **errstack**0", _err.Error()) + return nil, _err + } + defer _resp.Body.Close() + if _resp.IsError() { + _err = errors.Errorf("%+v **errstack**0", _resp.String()) + return nil, _err } else { - resp_body_bytes, err := io.ReadAll(resp.Body) + resp_body_bytes, err := io.ReadAll(_resp.Body) if err != nil { - err = errors.Errorf("%+v **warn**0", err.Error()) + err = errors.Errorf("%s **errstack**0", err.Error()) return nil, err } return resp_body_bytes, nil diff --git a/server/handler/searchHandle.go b/server/handler/searchHandle.go index 5c6de5f..e8f6710 100644 --- a/server/handler/searchHandle.go +++ b/server/handler/searchHandle.go @@ -1,7 +1,6 @@ package handler import ( - "bytes" "encoding/json" "io" @@ -31,26 +30,21 @@ func Search_LogTimeAxisDataHandle(ctx *gin.Context) { return } req_body := struct { - Index string `json:"index"` - DSL interface{} `json:"dsl"` + Index string `json:"index"` + Id string `json:"id"` + Params map[string]interface{} `json:"params"` }{} err = json.Unmarshal(req_body_bytes, &req_body) if err != nil { newError(ctx, err) return } - query_body_bytes, err := json.Marshal(req_body.DSL) - if err != nil { - newError(ctx, err) - return + query_body := map[string]interface{}{ + "id": req_body.Id, + "params": req_body.Params, } - resp_body_bytes, err := elasticClient.Global_elastic.Search(req_body.Index, bytes.NewReader(query_body_bytes)) - if err != nil { - wrapError(ctx, err) - return - } - data, err := cluster.ProcessLogTimeAixsData(resp_body_bytes) + data, err := cluster.ProcessLogTimeAixsData(req_body.Index, query_body) if err != nil { wrapError(ctx, err) return diff --git a/server/service/cluster/LogTimeAixs.go b/server/service/cluster/LogTimeAixs.go index 2b400ae..8571220 100644 --- a/server/service/cluster/LogTimeAixs.go +++ b/server/service/cluster/LogTimeAixs.go @@ -1,13 +1,20 @@ package cluster import ( + "gitee.com/openeuler/PilotGo-plugin-elk/elasticClient" + "github.com/pkg/errors" "github.com/tidwall/gjson" ) -func ProcessLogTimeAixsData(raw_results_bytes []byte) ([]map[string]interface{}, error) { - log_type_datas := []map[string]interface{}{} +func ProcessLogTimeAixsData(index string, querybody map[string]interface{}) ([]map[string]interface{}, error) { + search_result_body_bytes, err := elasticClient.Global_elastic.SearchByTemplate(index, querybody) + if err != nil { + err = errors.Wrap(err, "fail to process log timeaxis data") + return nil, err + } - hostname_agg_raw_arr := gjson.GetBytes(raw_results_bytes, "aggregations.1.buckets").Array() + log_type_datas := []map[string]interface{}{} + hostname_agg_raw_arr := gjson.GetBytes(search_result_body_bytes, "aggregations.1.buckets").Array() for _, log_type_data_raw := range hostname_agg_raw_arr { log_type_data := map[string]interface{}{} log_timestamp_datas := [][]interface{}{} -- Gitee