Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 52 additions & 41 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type (
timeSource clock.TimeSource
failoverNotificationVersion int64
ShardDistributorMatchingConfig clientcommon.Config
selfAddress membership.HostInfo
}
)

Expand Down Expand Up @@ -142,6 +143,11 @@ func NewEngine(
shardDistributorClient executorclient.Client,
ShardDistributorMatchingConfig clientcommon.Config,
) Engine {
selfAddress, err := resolver.WhoAmI()
if err != nil {
logger.Fatal("failed to lookup self in membership", tag.Error(err))
}

e := &matchingEngineImpl{
shutdown: make(chan struct{}),
shutdownCompletion: &sync.WaitGroup{},
Expand All @@ -162,6 +168,7 @@ func NewEngine(
isolationState: isolationState,
timeSource: timeSource,
ShardDistributorMatchingConfig: ShardDistributorMatchingConfig,
selfAddress: selfAddress,
}

e.setupExecutor(shardDistributorClient)
Expand Down Expand Up @@ -1515,66 +1522,70 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog(
}
}

func (e *matchingEngineImpl) errIfShardOwnershipLost(ctx context.Context, taskList *tasklist.Identifier) error {
if !e.config.EnableTasklistOwnershipGuard() {
return nil
func (e *matchingEngineImpl) stillOwnShard(ctx context.Context, taskList *tasklist.Identifier, reason, whoOwns *string) (bool, error) {
if e.isShuttingDown() {
*reason = "engine is shutting down"
return false, nil
}

// We have a shard-processor shared by all the task lists with the same name.
// For now there is no 1:1 mapping between shards and tasklists. (#tasklists >= #shards)
sp, err := e.executor.GetShardProcess(ctx, taskList.GetName())
if e.executor.IsOnboardedToSD() {
// We have a shard-processor shared by all the task lists with the same name.
// For now there is no 1:1 mapping between shards and tasklists. (#tasklists >= #shards)
sp, err := e.executor.GetShardProcess(ctx, taskList.GetName())
if err != nil {
return fmt.Errorf("failed to lookup ownership in SD: %w", err)
return false, fmt.Errorf("failed to lookup ownership in SD: %w", err)
}

if sp == nil {
return fmt.Errorf("failed to lookup ownership in SD: shard process is nil")
*reason = "shard process not found"
return false, nil
}
return nil
return true, nil
}

self, err := e.membershipResolver.WhoAmI()
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
if err != nil {
return fmt.Errorf("failed to lookup self im membership: %w", err)
return false, fmt.Errorf("failed to lookup task list owner: %w", err)
}
if taskListOwner.Identity() != e.selfAddress.Identity() {
*reason = "engine does not own this shard"
*whoOwns = taskListOwner.Identity()
return false, nil
}

if e.isShuttingDown() {
e.logger.Warn("request to get tasklist is being rejected because engine is shutting down",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)
return true, nil
}

return cadence_errors.NewTaskListNotOwnedByHostError(
"not known",
self.Identity(),
taskList.GetName(),
)
// Defensive check to make sure we actually own the task list
//
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
func (e *matchingEngineImpl) errIfShardOwnershipLost(ctx context.Context, taskList *tasklist.Identifier) error {
if !e.config.EnableTasklistOwnershipGuard() {
return nil
}

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
reason := "unknown reason"
whoOwns := "not known"
ok, err := e.stillOwnShard(ctx, taskList, &reason, &whoOwns)
if err != nil {
return fmt.Errorf("failed to lookup task list owner: %w", err)
return err
}

if taskListOwner.Identity() != self.Identity() {
e.logger.Warn("Request to get tasklist is being rejected because engine does not own this shard",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)
return cadence_errors.NewTaskListNotOwnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
if ok {
return nil
}

return nil
e.logger.Warn(fmt.Sprintf("request to get tasklist is being rejected because %s", reason),
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)
return cadence_errors.NewTaskListNotOwnedByHostError(
whoOwns,
e.selfAddress.Identity(),
taskList.GetName(),
)
}

func (e *matchingEngineImpl) isShuttingDown() bool {
Expand Down
131 changes: 131 additions & 0 deletions service/matching/handler/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
cadence_errors "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -743,6 +744,136 @@ func TestIsShuttingDown(t *testing.T) {
assert.True(t, e.isShuttingDown())
}

func TestErrIfShardOwnershipLost(t *testing.T) {
taskListID, err := tasklist.NewIdentifier("test-domain-id", "test-tasklist", persistence.TaskListTypeActivity)
require.NoError(t, err)
selfIdentity := "this-host"

testCases := []struct {
name string
enableOwnershipGuard bool
onboardedToSD bool
setupMocks func(ctrl *gomock.Controller, resolver *membership.MockResolver, executor *executorclient.MockExecutor[tasklist.ShardProcessor], engine *matchingEngineImpl)
expectedErr error
}{
{
name: "ownership guard not enabled",
enableOwnershipGuard: false,
expectedErr: nil,
},
{
name: "engine is shutting down",
setupMocks: func(ctrl *gomock.Controller, resolver *membership.MockResolver, executor *executorclient.MockExecutor[tasklist.ShardProcessor], engine *matchingEngineImpl) {
close(engine.shutdown)
},
enableOwnershipGuard: true,
expectedErr: &cadence_errors.TaskListNotOwnedByHostError{
OwnedByIdentity: "not known",
MyIdentity: selfIdentity,
TasklistName: taskListID.GetName(),
},
},
{
name: "ownership lookup in SD fails",
onboardedToSD: true,
setupMocks: func(ctrl *gomock.Controller, resolver *membership.MockResolver, executor *executorclient.MockExecutor[tasklist.ShardProcessor], engine *matchingEngineImpl) {
executor.EXPECT().GetShardProcess(gomock.Any(), taskListID.GetName()).Return(nil, errors.New("sd down"))
},
enableOwnershipGuard: true,
expectedErr: errors.New("failed to lookup ownership in SD"),
},
{
name: "shard process not found in SD",
onboardedToSD: true,
setupMocks: func(ctrl *gomock.Controller, resolver *membership.MockResolver, executor *executorclient.MockExecutor[tasklist.ShardProcessor], engine *matchingEngineImpl) {
executor.EXPECT().GetShardProcess(gomock.Any(), taskListID.GetName()).Return(nil, nil)
},
enableOwnershipGuard: true,
expectedErr: &cadence_errors.TaskListNotOwnedByHostError{
OwnedByIdentity: "not known", // SD does not provide owner identity.
MyIdentity: selfIdentity,
TasklistName: taskListID.GetName(),
},
},
{
name: "owned when shard process exists in SD",
onboardedToSD: true,
setupMocks: func(ctrl *gomock.Controller, resolver *membership.MockResolver, executor *executorclient.MockExecutor[tasklist.ShardProcessor], engine *matchingEngineImpl) {
executor.EXPECT().GetShardProcess(gomock.Any(), taskListID.GetName()).Return(tasklist.NewMockShardProcessor(ctrl), nil)
},
enableOwnershipGuard: true,
expectedErr: nil,
},
{
name: "legacy ownership lookup fails",
enableOwnershipGuard: true,
onboardedToSD: false,
setupMocks: func(ctrl *gomock.Controller, resolver *membership.MockResolver, executor *executorclient.MockExecutor[tasklist.ShardProcessor], engine *matchingEngineImpl) {
resolver.EXPECT().Lookup(service.Matching, taskListID.GetName()).
Return(membership.NewDetailedHostInfo("", "", nil), errors.New("resolver down"))
},
expectedErr: errors.New("failed to lookup task list owner"),
},
{
name: "legacy owner mismatch returns not-owned error",
enableOwnershipGuard: true,
onboardedToSD: false,
setupMocks: func(ctrl *gomock.Controller, resolver *membership.MockResolver, executor *executorclient.MockExecutor[tasklist.ShardProcessor], engine *matchingEngineImpl) {
resolver.EXPECT().Lookup(service.Matching, taskListID.GetName()).
Return(membership.NewDetailedHostInfo("", "another-host", nil), nil)
},
expectedErr: &cadence_errors.TaskListNotOwnedByHostError{
OwnedByIdentity: "another-host",
MyIdentity: selfIdentity,
TasklistName: taskListID.GetName(),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
resolver := membership.NewMockResolver(ctrl)
executor := executorclient.NewMockExecutor[tasklist.ShardProcessor](ctrl)
executor.EXPECT().IsOnboardedToSD().Return(tc.onboardedToSD).AnyTimes()

engine := &matchingEngineImpl{
shutdown: make(chan struct{}),
executor: executor,
membershipResolver: resolver,
selfAddress: membership.NewDetailedHostInfo("", selfIdentity, nil),
config: &config.Config{
EnableTasklistOwnershipGuard: func(opts ...dynamicproperties.FilterOption) bool {
return tc.enableOwnershipGuard
},
},
logger: log.NewNoop(),
}

if tc.setupMocks != nil {
tc.setupMocks(ctrl, resolver, executor, engine)
}

err := engine.errIfShardOwnershipLost(context.Background(), taskListID)

if tc.expectedErr == nil {
require.NoError(t, err)
} else {
require.Error(t, err)

var expectedNotOwnedErr *cadence_errors.TaskListNotOwnedByHostError
if errors.As(tc.expectedErr, &expectedNotOwnedErr) {
var actualNotOwnedErr *cadence_errors.TaskListNotOwnedByHostError
require.ErrorAs(t, err, &actualNotOwnedErr)
assert.Equal(t, expectedNotOwnedErr, actualNotOwnedErr)
} else {
require.ErrorContains(t, err, tc.expectedErr.Error())
}
}
})
}
}

func TestGetTasklistsNotOwned(t *testing.T) {

ctrl := gomock.NewController(t)
Expand Down
Loading