Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions app/gateway/http/index_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pkg/errors"

"github.com/CocaineCong/tangseng/app/gateway/rpc"
"github.com/CocaineCong/tangseng/consts/e"
pb "github.com/CocaineCong/tangseng/idl/pb/index_platform"
"github.com/CocaineCong/tangseng/pkg/ctl"
log "github.com/CocaineCong/tangseng/pkg/logger"
Expand All @@ -47,3 +48,27 @@ func BuildIndexByFiles(ctx *gin.Context) {

ctx.JSON(http.StatusOK, ctl.RespSuccess(ctx, r))
}

func UploadIndexByFiles(ctx *gin.Context) {
var req pb.BuildIndexReq
if err := ctx.ShouldBind(&req); err != nil {
log.LogrusObj.Errorf("Bind:%v", err)
ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "绑定参数错误"))
return
}
file, fileHeader, _ := ctx.Request.FormFile("file")
if fileHeader == nil {
err := errors.New(e.GetMsg(e.ErrorUploadFile))
ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "上传错误"))
log.LogrusObj.Error(err)
return
}
r, err := rpc.UploadByStream(ctx, &req, file, fileHeader.Size)
if err != nil {
log.LogrusObj.Errorf("rpc.BuildIndex failed, original error: %T %v", errors.Cause(err), errors.Cause(err))
ctx.JSON(http.StatusOK, ctl.RespError(ctx, err, "UploadIndexByFiles RPC服务调用错误"))
return
}

ctx.JSON(http.StatusOK, ctl.RespSuccess(ctx, r))
}
1 change: 1 addition & 0 deletions app/gateway/routes/index_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ func IndexPlatformRegisterHandlers(rg *gin.RouterGroup) {
indexPlatformGroup := rg.Group("/index_platform")
{
indexPlatformGroup.POST("/build_index", http.BuildIndexByFiles)
indexPlatformGroup.POST("/upload_index", http.UploadIndexByFiles)
}
}
30 changes: 30 additions & 0 deletions app/gateway/rpc/index_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package rpc

import (
"context"
"io"
"mime/multipart"

"github.com/pkg/errors"

pb "github.com/CocaineCong/tangseng/idl/pb/index_platform"
log "github.com/CocaineCong/tangseng/pkg/logger"
)

// BuildIndex 建立索引的RPC调用
Expand All @@ -35,3 +38,30 @@ func BuildIndex(ctx context.Context, req *pb.BuildIndexReq) (resp *pb.BuildIndex

return
}

func UploadByStream(ctx context.Context, req *pb.BuildIndexReq, file multipart.File, fileSize int64) (resp *pb.UploadResponse, err error) {
stream, err := IndexPlatformClient.UploadFile(ctx)
if err != nil {
err = errors.WithMessage(err, "IndexPlatformClient.UploadStream err")
return
}
buf := make([]byte, 1024*1024) // 1MB chunks
for {
n, errx := file.Read(buf)
if errx == io.EOF {
break
}
if err = stream.Send(&pb.FileChunk{
Content: buf[:n],
}); err != nil {
log.LogrusObj.Error("stream.Send", err)
return
}
}
resp, err = stream.CloseAndRecv()
if err != nil && err != io.EOF {
return nil, err
}

return resp, nil
}
113 changes: 111 additions & 2 deletions app/index_platform/service/index_platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
"context"
"fmt"
"hash/fnv"
"io"
"os"
"sort"
"strings"
"sync"

"github.com/pkg/errors"

"github.com/RoaringBitmap/roaring"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/pkg/errors"
"github.com/spf13/cast"

"github.com/CocaineCong/tangseng/app/index_platform/analyzer"
Expand Down Expand Up @@ -234,3 +234,112 @@ func iHash(key string) int64 { // nolint:golint,unused
_, _ = h.Write([]byte(key))
return int64(h.Sum32() & 0x7fffffff)
}

func (s *IndexPlatformSrv) UploadFile(stream pb.IndexPlatformService_UploadFileServer) (err error) {
ctx := stream.Context()
// 时间估计
invertedIndex := cmap.New[*roaring.Bitmap]() // 倒排索引
dictTrie := trie.NewTrie() // 前缀树
// mapreduce 这个是用chan和goroutine来代替master和worker的rpc调用,避免了频繁的rpc调用
_, _ = mapreduce.MapReduce(func(source chan<- []byte) {
chunk, err := stream.Recv()
if err == io.EOF {
_ = stream.SendAndClose(&pb.UploadResponse{
Code: e.SUCCESS,
Message: cconsts.IndexPlatformUploadSuccess,
})
}
source <- chunk.Content
}, func(item []byte, writer mapreduce.Writer[[]*types.KeyValue], cancel func(error)) {
// 控制并发
var wg sync.WaitGroup
ch := make(chan struct{}, 3)

keyValueList := make([]*types.KeyValue, 0, 1e3)
lines := strings.Split(string(item), "\r\n")
for _, line := range lines[1:] {
ch <- struct{}{}
wg.Add(1)
docStruct, _ := input_data.Doc2Struct(line) // line 转 docs struct
if docStruct.DocId == 0 {
continue
}

// 分词
tokens, _ := analyzer.GseCutForBuildIndex(docStruct.DocId, docStruct.Body)
for _, v := range tokens {
if v.Token == "" || v.Token == " " {
continue
}
keyValueList = append(keyValueList, &types.KeyValue{Key: v.Token, Value: cast.ToString(v.DocId)})
dictTrie.Insert(v.Token)
}

// 建立正排索引
go func(docStruct *types.Document) {
err = input_data.DocData2Kfk(docStruct)
if err != nil {
logs.LogrusObj.Error(err)
}
defer wg.Done()
<-ch
}(docStruct)
}
wg.Wait()

// // 构建前缀树 // TODO: kafka异步处理一下前缀树的插入,不然占着这里的资源
// go func(tokenList []string) {
// err = input_data.DocTrie2Kfk(tokenList)
// if err != nil {
// logs.LogrusObj.Error("DocTrie2Kfk", err)
// }
// }(tokenList)

// shuffle 排序过程
sort.Sort(types.ByKey(keyValueList))
writer.Write(keyValueList)
}, func(pipe <-chan []*types.KeyValue, writer mapreduce.Writer[string], cancel func(error)) {
for values := range pipe {
for _, v := range values { // 构建倒排索引
if value, ok := invertedIndex.Get(v.Key); ok {
value.AddInt(cast.ToInt(v.Value))
invertedIndex.Set(v.Key, value)
} else {
docIds := roaring.NewBitmap()
docIds.AddInt(cast.ToInt(v.Value))
invertedIndex.Set(v.Key, docIds)
}
}
}
})

// 存储倒排索引
go func() {
newCtx := clone.NewContextWithoutDeadline()
newCtx.Clone(ctx)
err = storeInvertedIndexByHash(newCtx, invertedIndex)
if err != nil {
logs.LogrusObj.Error("storeInvertedIndexByHash error ", err)
}
}()

logs.LogrusObj.Infoln("storeInvertedIndexByHash End")

// 存储前缀树
go func() {
newCtx := clone.NewContextWithoutDeadline()
newCtx.Clone(ctx)
err = storeDictTrieByHash(newCtx, dictTrie)
if err != nil {
logs.LogrusObj.Error("storeDictTrieByHash error ", err)
logs.LogrusObj.Errorf("stack trace: \n%+v\n", err)
}
}()

return nil
}

func (s *IndexPlatformSrv) DownloadFile(file *pb.FileRequest, req pb.IndexPlatformService_DownloadFileServer) (err error) {

return nil
}
7 changes: 4 additions & 3 deletions consts/e/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ const (
ERROR = 500
InvalidParams = 400

//成员错误
// 成员错误
ErrorExistUser = 10002
ErrorNotExistUser = 10003
ErrorFailEncryption = 10006
ErrorNotCompare = 10007
ErrorUploadFile = 10008

HaveSignUp = 20001
ErrorActivityTimeout = 20002

ErrorAuthCheckTokenFail = 30001 //token 错误
ErrorAuthCheckTokenTimeout = 30002 //token 过期
ErrorAuthCheckTokenFail = 30001 // token 错误
ErrorAuthCheckTokenTimeout = 30002 // token 过期
ErrorAuthToken = 30003
ErrorAuth = 30004
ErrorAuthNotFound = 30005
Expand Down
1 change: 1 addition & 0 deletions consts/e/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var MsgFlags = map[int]string{
ErrorNotCompare: "不匹配",
ErrorDatabase: "数据库操作出错,请重试",
ErrorAuthNotFound: "Token不能为空",
ErrorUploadFile: "上传错误",
}

// GetMsg 获取状态码对应信息
Expand Down
5 changes: 5 additions & 0 deletions consts/index_platform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package consts

const (
IndexPlatformUploadSuccess = "上传成功"
)
17 changes: 17 additions & 0 deletions idl/index_platform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,21 @@ message BuildIndexResp {

service IndexPlatformService {
rpc BuildIndexService(BuildIndexReq) returns (BuildIndexResp);
rpc UploadFile(stream FileChunk) returns (UploadResponse); // 客户端流式传输
rpc DownloadFile(FileRequest) returns (stream FileChunk); // 服务端流式传输
}

message FileChunk {
bytes content = 1;
}

message UploadResponse {
// @inject_tag:form:"code" uri:"code"
int64 code = 1;
// @inject_tag:form:"message" uri:"message"
string message = 2;
}

message FileRequest {
string filename = 1;
}
Loading
Loading