Skip to content

Commit d809d34

Browse files
fix: Alias search based on time
1 parent 83a333a commit d809d34

File tree

1 file changed

+112
-22
lines changed

1 file changed

+112
-22
lines changed

db-connector.go

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,20 @@ func getWorkflowExecutionByAliasSearch(ctx context.Context, aliasName, id string
10171017
"values": []string{id},
10181018
},
10191019
},
1020+
"sort": []map[string]interface{}{
1021+
{
1022+
"edited": map[string]interface{}{
1023+
"order": "desc",
1024+
"unmapped_type": "long",
1025+
},
1026+
},
1027+
{
1028+
"created": map[string]interface{}{
1029+
"order": "desc",
1030+
"unmapped_type": "long",
1031+
},
1032+
},
1033+
},
10201034
}
10211035

10221036
if err := json.NewEncoder(&buf).Encode(query); err != nil {
@@ -2504,33 +2518,98 @@ func GetApp(ctx context.Context, id string, user User, skipCache bool) (*Workflo
25042518
}
25052519

25062520
if project.DbType == "opensearch" {
2521+
indexAlias := strings.ToLower(GetESIndexPrefix(nameKey))
25072522
resp, err := project.Es.Document.Get(ctx, opensearchapi.DocumentGetReq{
2508-
Index: strings.ToLower(GetESIndexPrefix(nameKey)),
2523+
Index: indexAlias,
25092524
DocumentID: id,
25102525
})
25112526
if err != nil {
2512-
log.Printf("[WARNING] Error for %s: %s", cacheKey, err)
2513-
return workflowApp, err
2514-
}
2527+
if strings.Contains(err.Error(), "has more than one index associated with it") {
2528+
var buf bytes.Buffer
2529+
query := map[string]interface{}{
2530+
"size": 1,
2531+
"query": map[string]interface{}{
2532+
"ids": map[string]interface{}{
2533+
"values": []string{id},
2534+
},
2535+
},
2536+
"sort": []map[string]interface{}{
2537+
{
2538+
"edited": map[string]interface{}{
2539+
"order": "desc",
2540+
"unmapped_type": "long",
2541+
},
2542+
},
2543+
{
2544+
"created": map[string]interface{}{
2545+
"order": "desc",
2546+
"unmapped_type": "long",
2547+
},
2548+
},
2549+
},
2550+
}
25152551

2516-
res := resp.Inspect().Response
2517-
defer res.Body.Close()
2518-
if res.StatusCode == 404 {
2519-
return workflowApp, errors.New("App doesn't exist")
2520-
}
2552+
if err := json.NewEncoder(&buf).Encode(query); err != nil {
2553+
return workflowApp, err
2554+
}
25212555

2522-
respBody, err := ioutil.ReadAll(res.Body)
2523-
if err != nil {
2524-
return workflowApp, err
2525-
}
2556+
searchResp, serr := project.Es.Search(ctx, &opensearchapi.SearchReq{
2557+
Indices: []string{indexAlias},
2558+
Body: &buf,
2559+
})
2560+
if serr != nil {
2561+
return workflowApp, serr
2562+
}
25262563

2527-
wrapped := AppWrapper{}
2528-
err = json.Unmarshal(respBody, &wrapped)
2529-
if err != nil {
2530-
return workflowApp, err
2531-
}
2564+
searchRes := searchResp.Inspect().Response
2565+
defer searchRes.Body.Close()
2566+
searchBody, serr := ioutil.ReadAll(searchRes.Body)
2567+
if serr != nil {
2568+
return workflowApp, serr
2569+
}
2570+
2571+
if searchRes.StatusCode != 200 && searchRes.StatusCode != 201 {
2572+
return workflowApp, errors.New(fmt.Sprintf("Bad statuscode: %d, error: %s", searchRes.StatusCode, string(searchBody)))
2573+
}
25322574

2533-
workflowApp = &wrapped.Source
2575+
wrappedSearch := AppSearchWrapper{}
2576+
if serr := json.Unmarshal(searchBody, &wrappedSearch); serr != nil {
2577+
return workflowApp, serr
2578+
}
2579+
2580+
if len(wrappedSearch.Hits.Hits) == 0 {
2581+
return workflowApp, errors.New("App doesn't exist")
2582+
}
2583+
2584+
workflowApp = &wrappedSearch.Hits.Hits[0].Source
2585+
} else {
2586+
log.Printf("[WARNING] Error for %s: %s", cacheKey, err)
2587+
return workflowApp, err
2588+
}
2589+
} else {
2590+
res := resp.Inspect().Response
2591+
defer res.Body.Close()
2592+
if res.StatusCode == 404 {
2593+
return workflowApp, errors.New("App doesn't exist")
2594+
}
2595+
2596+
respBody, err := ioutil.ReadAll(res.Body)
2597+
if err != nil {
2598+
return workflowApp, err
2599+
}
2600+
2601+
if res.StatusCode != 200 && res.StatusCode != 201 {
2602+
return workflowApp, errors.New(fmt.Sprintf("Bad statuscode: %d, error: %s", res.StatusCode, string(respBody)))
2603+
}
2604+
2605+
wrapped := AppWrapper{}
2606+
err = json.Unmarshal(respBody, &wrapped)
2607+
if err != nil {
2608+
return workflowApp, err
2609+
}
2610+
2611+
workflowApp = &wrapped.Source
2612+
}
25342613
} else {
25352614
//log.Printf("[DEBUG] Getting app from datastore for ID %s", id)
25362615

@@ -2568,7 +2647,6 @@ func GetApp(ctx context.Context, id string, user User, skipCache bool) (*Workflo
25682647
}
25692648
}
25702649
}
2571-
25722650
if project.CacheDb {
25732651
data, err := json.Marshal(workflowApp)
25742652
if err != nil {
@@ -3650,6 +3728,20 @@ func getWorkflowByAliasSearch(ctx context.Context, aliasName, id string) (*Workf
36503728
"values": []string{id},
36513729
},
36523730
},
3731+
"sort": []map[string]interface{}{
3732+
{
3733+
"edited": map[string]interface{}{
3734+
"order": "desc",
3735+
"unmapped_type": "long",
3736+
},
3737+
},
3738+
{
3739+
"created": map[string]interface{}{
3740+
"order": "desc",
3741+
"unmapped_type": "long",
3742+
},
3743+
},
3744+
},
36533745
}
36543746

36553747
if err := json.NewEncoder(&buf).Encode(query); err != nil {
@@ -13970,8 +14062,6 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto
1397014062
}
1397114063
}
1397214064

13973-
13974-
1397514065
// New struct, to not add body, author etc
1397614066
if project.DbType == "opensearch" {
1397714067
var buf bytes.Buffer

0 commit comments

Comments
 (0)