diff --git a/server/elasticClient/client.go b/server/elasticClient/client.go index a90fe9215018871f5b2ffca1df8be28e9265b78e..871fcaa3c87450e4dfe4a5405e299a9b9a435659 100644 --- a/server/elasticClient/client.go +++ b/server/elasticClient/client.go @@ -1,8 +1,10 @@ package elasticClient import ( + "bytes" "context" "crypto/tls" + "encoding/json" "fmt" "io" "net" @@ -17,6 +19,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-elk/global/template" "gitee.com/openeuler/PilotGo-plugin-elk/pluginclient" elastic "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esapi" ) var Global_elastic *ElasticClient_v7 @@ -60,6 +63,7 @@ func InitElasticClient() { Global_elastic.initSearchTemplate() } +// 在elasticsearch中添加查询模板 func (client *ElasticClient_v7) initSearchTemplate() { for key, value := range template.DSL_template_map { reqbody := strings.NewReader(value) @@ -76,7 +80,8 @@ func (client *ElasticClient_v7) initSearchTemplate() { } } -func (client *ElasticClient_v7) SearchByQueryRequestBody(index string, body io.Reader) ([]byte, error) { +// 通过dsl查询 +func (client *ElasticClient_v7) SearchByDsl(index string, body io.Reader) ([]byte, error) { resp, err := client.Client.Search( client.Client.Search.WithContext(client.Ctx), client.Client.Search.WithIndex(index), @@ -84,18 +89,40 @@ func (client *ElasticClient_v7) SearchByQueryRequestBody(index string, body io.R client.Client.Search.WithTrackTotalHits(true), client.Client.Search.WithPretty(), ) + return client.processApiResult(resp, err) +} + +// 通过调用template模板查询 +func (client *ElasticClient_v7) SearchByTemplate(index string, querybody map[string]interface{}) ([]byte, error) { + query_body_bytes, err := json.Marshal(querybody) if err != nil { - err = errors.Errorf("%+v **errstack**0", err.Error()) + err = errors.Errorf("%s **errstack**0", err.Error()) return nil, err } - defer resp.Body.Close() - if resp.IsError() { - err = errors.Errorf("%+v **errstack**0", resp.String()) - return nil, err + query_body_reader := bytes.NewReader(query_body_bytes) + resp, err := client.Client.SearchTemplate( + query_body_reader, + client.Client.SearchTemplate.WithContext(context.Background()), + client.Client.SearchTemplate.WithIndex(index), + client.Client.SearchTemplate.WithPretty(), + ) + return client.processApiResult(resp, err) +} + +// 处理elasticsearch client接口的返回值 +func (client *ElasticClient_v7) processApiResult(_resp *esapi.Response, _err error) ([]byte, error) { + if _err != nil { + _err = errors.Errorf("%+v **errstack**0", _err.Error()) + return nil, _err + } + defer _resp.Body.Close() + if _resp.IsError() { + _err = errors.Errorf("%+v **errstack**0", _resp.String()) + return nil, _err } else { - resp_body_bytes, err := io.ReadAll(resp.Body) + resp_body_bytes, err := io.ReadAll(_resp.Body) if err != nil { - err = errors.Errorf("%+v **warn**0", err.Error()) + err = errors.Errorf("%s **errstack**0", err.Error()) return nil, err } return resp_body_bytes, nil diff --git a/server/handler/searchHandle.go b/server/handler/searchHandle.go index 5c6de5fa81e05763e0f3af5fbc1efd9549f68548..e8f6710b71024f675f160ce1d5901a4b8a6d21e3 100644 --- a/server/handler/searchHandle.go +++ b/server/handler/searchHandle.go @@ -1,7 +1,6 @@ package handler import ( - "bytes" "encoding/json" "io" @@ -31,26 +30,21 @@ func Search_LogTimeAxisDataHandle(ctx *gin.Context) { return } req_body := struct { - Index string `json:"index"` - DSL interface{} `json:"dsl"` + Index string `json:"index"` + Id string `json:"id"` + Params map[string]interface{} `json:"params"` }{} err = json.Unmarshal(req_body_bytes, &req_body) if err != nil { newError(ctx, err) return } - query_body_bytes, err := json.Marshal(req_body.DSL) - if err != nil { - newError(ctx, err) - return + query_body := map[string]interface{}{ + "id": req_body.Id, + "params": req_body.Params, } - resp_body_bytes, err := elasticClient.Global_elastic.Search(req_body.Index, bytes.NewReader(query_body_bytes)) - if err != nil { - wrapError(ctx, err) - return - } - data, err := cluster.ProcessLogTimeAixsData(resp_body_bytes) + data, err := cluster.ProcessLogTimeAixsData(req_body.Index, query_body) if err != nil { wrapError(ctx, err) return diff --git a/server/service/cluster/LogTimeAixs.go b/server/service/cluster/LogTimeAixs.go index 2b400ae8b7897ae7941da91b0424fee38c4eeab3..857122020da02a3e3cd0aab632bfcc21d1c594aa 100644 --- a/server/service/cluster/LogTimeAixs.go +++ b/server/service/cluster/LogTimeAixs.go @@ -1,13 +1,20 @@ package cluster import ( + "gitee.com/openeuler/PilotGo-plugin-elk/elasticClient" + "github.com/pkg/errors" "github.com/tidwall/gjson" ) -func ProcessLogTimeAixsData(raw_results_bytes []byte) ([]map[string]interface{}, error) { - log_type_datas := []map[string]interface{}{} +func ProcessLogTimeAixsData(index string, querybody map[string]interface{}) ([]map[string]interface{}, error) { + search_result_body_bytes, err := elasticClient.Global_elastic.SearchByTemplate(index, querybody) + if err != nil { + err = errors.Wrap(err, "fail to process log timeaxis data") + return nil, err + } - hostname_agg_raw_arr := gjson.GetBytes(raw_results_bytes, "aggregations.1.buckets").Array() + log_type_datas := []map[string]interface{}{} + hostname_agg_raw_arr := gjson.GetBytes(search_result_body_bytes, "aggregations.1.buckets").Array() for _, log_type_data_raw := range hostname_agg_raw_arr { log_type_data := map[string]interface{}{} log_timestamp_datas := [][]interface{}{}