Skip to content
This repository was archived by the owner on Nov 5, 2024. It is now read-only.

Commit 2c69922

Browse files
committed
上传基础的集群相关代码
1 parent 29c3e59 commit 2c69922

25 files changed

+906
-0
lines changed

teacluster/action.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package teacluster
2+
3+
import (
4+
"time"
5+
)
6+
7+
type Action struct {
8+
Id uint64
9+
RequestTime time.Time
10+
RequestId uint64
11+
}
12+
13+
func (this *Action) Execute() error {
14+
return nil
15+
}
16+
17+
func (this *Action) OnSuccess(success *SuccessAction) error {
18+
return nil
19+
}
20+
21+
func (this *Action) OnFail(fail *FailAction) error {
22+
return nil
23+
}
24+
25+
func (this *Action) BaseAction() *Action {
26+
return this
27+
}

teacluster/action_fail.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package teacluster
2+
3+
type FailAction struct {
4+
Action
5+
6+
Message string
7+
}
8+
9+
func (this *FailAction) Name() string {
10+
return "fail"
11+
}
12+
13+
func (this *FailAction) TypeId() int8 {
14+
return 2
15+
}

teacluster/action_interface.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package teacluster
2+
3+
type ActionInterface interface {
4+
Name() string
5+
Execute() error
6+
OnSuccess(success *SuccessAction) error
7+
OnFail(fail *FailAction) error
8+
TypeId() int8
9+
BaseAction() *Action
10+
}

teacluster/action_notify.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package teacluster
2+
3+
// cluster -> slave node
4+
type NotifyAction struct {
5+
Action
6+
}
7+
8+
func (this *NotifyAction) Name() string {
9+
return "notify"
10+
}
11+
12+
func (this *NotifyAction) TypeId() int8 {
13+
return 4
14+
}
15+
16+
func (this *NotifyAction) Execute() error {
17+
ClusterManager.Write(&SumAction{})
18+
return nil
19+
}

teacluster/action_pull.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package teacluster
2+
3+
import (
4+
"github.com/TeaWeb/code/teacluster/configs"
5+
)
6+
7+
// node <- cluster
8+
type PullAction struct {
9+
Action
10+
11+
LocalItems []*configs.Item // items without data
12+
}
13+
14+
func (this *PullAction) Name() string {
15+
return "pull"
16+
}
17+
18+
func (this *PullAction) Execute() error {
19+
return nil
20+
}
21+
22+
func (this *PullAction) TypeId() int8 {
23+
return 6
24+
}

teacluster/action_push.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package teacluster
2+
3+
import (
4+
"github.com/TeaWeb/code/teacluster/configs"
5+
"github.com/iwind/TeaGo/logs"
6+
)
7+
8+
// master -> cluster
9+
type PushAction struct {
10+
Action
11+
12+
Items []*configs.Item
13+
}
14+
15+
func (this *PushAction) Name() string {
16+
return "push"
17+
}
18+
19+
func (this *PushAction) Execute() error {
20+
return nil
21+
}
22+
23+
func (this *PushAction) AddItem(item *configs.Item) {
24+
this.Items = append(this.Items, item)
25+
}
26+
27+
func (this *PushAction) OnSuccess(success *SuccessAction) error {
28+
return nil
29+
}
30+
31+
func (this *PushAction) OnFail(fail *FailAction) error {
32+
logs.Println("[push]fail:", fail.Message)
33+
34+
// TODO retry later
35+
36+
return nil
37+
}
38+
39+
func (this *PushAction) TypeId() int8 {
40+
return 5
41+
}

teacluster/action_register.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package teacluster
2+
3+
import (
4+
"github.com/TeaWeb/code/teaconfigs"
5+
"github.com/iwind/TeaGo/logs"
6+
)
7+
8+
type RegisterAction struct {
9+
Action
10+
11+
ClusterId string
12+
ClusterSecret string
13+
NodeId string
14+
NodeName string
15+
NodeRole string
16+
}
17+
18+
func (this *RegisterAction) Name() string {
19+
return "register"
20+
}
21+
22+
func (this *RegisterAction) Execute() error {
23+
return nil
24+
}
25+
26+
func (this *RegisterAction) OnSuccess(success *SuccessAction) error {
27+
if this.NodeRole == teaconfigs.NodeRoleMaster {
28+
logs.Println("[cluster]register master ok")
29+
ClusterManager.Write(&SumAction{})
30+
} else {
31+
logs.Println("[cluster]register node ok")
32+
}
33+
return nil
34+
}
35+
36+
func (this *RegisterAction) OnFail(fail *FailAction) error {
37+
logs.Println("[cluster]fail to register node:", fail.Message)
38+
return nil
39+
}
40+
41+
func (this *RegisterAction) TypeId() int8 {
42+
return 3
43+
}

teacluster/action_success.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package teacluster
2+
3+
import "github.com/iwind/TeaGo/maps"
4+
5+
type SuccessAction struct {
6+
Action
7+
8+
Message string
9+
Data maps.Map
10+
}
11+
12+
func (this *SuccessAction) Name() string {
13+
return "success"
14+
}
15+
16+
func (this *SuccessAction) TypeId() int8 {
17+
return 1
18+
}

teacluster/action_sum.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package teacluster
2+
3+
import (
4+
"github.com/iwind/TeaGo/logs"
5+
)
6+
7+
// cluster -> master|node
8+
type SumAction struct {
9+
Action
10+
}
11+
12+
func (this *SumAction) Name() string {
13+
return "sum"
14+
}
15+
16+
func (this *SumAction) OnSuccess(success *SuccessAction) error {
17+
logs.Println("sum:", success.Data)
18+
19+
// write to local file
20+
//file := files.NewFile(Tea.ConfigFile("cluster.sum"))
21+
//file.WriteString()
22+
23+
return nil
24+
}
25+
26+
func (this *SumAction) OnFail(fail *FailAction) error {
27+
// TODO retry later
28+
return nil
29+
}
30+
31+
func (this *SumAction) TypeId() int8 {
32+
return 9
33+
}

teacluster/action_utils.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package teacluster
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/iwind/TeaGo/logs"
7+
"github.com/vmihailenco/msgpack"
8+
"reflect"
9+
"sync/atomic"
10+
)
11+
12+
var actionMap = map[int8]reflect.Type{}
13+
var actionId uint64
14+
15+
func RegisterActionType(actions ...ActionInterface) {
16+
for _, action := range actions {
17+
typeId := action.TypeId()
18+
_, ok := actionMap[typeId]
19+
if ok {
20+
logs.Error(errors.New("action type '" + fmt.Sprintf("%d", typeId) + "' already exist"))
21+
continue
22+
}
23+
24+
msgpack.RegisterExt(typeId, action)
25+
actionMap[typeId] = reflect.TypeOf(action).Elem()
26+
}
27+
}
28+
29+
func FindActionInstance(typeId int8) ActionInterface {
30+
t, ok := actionMap[typeId]
31+
if !ok {
32+
return nil
33+
}
34+
return reflect.New(t).Interface().(ActionInterface)
35+
}
36+
37+
func GenerateActionId() uint64 {
38+
return atomic.AddUint64(&actionId, 1)
39+
}

0 commit comments

Comments
 (0)