Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3fca115
chore: add .osgrep to .gitignore
mlwelles Feb 4, 2026
988bb6e
refactor(sharding): introduce IsLabeled() helpers for protobuf types
mlwelles Feb 4, 2026
ff906a1
docs: add entity-level sub-tablet routing design
mlwelles Feb 4, 2026
ae1b495
docs: add entity-level sub-tablet routing implementation plan
mlwelles Feb 4, 2026
aa97543
feat(sharding): add TabletKey/ParseTabletKey composite key helpers
mlwelles Feb 4, 2026
a00b543
feat(sharding): add ServingSubTablet and ServingTablets to Zero state…
mlwelles Feb 4, 2026
11cac9c
feat(sharding): update handleTablet to use composite sub-tablet keys
mlwelles Feb 4, 2026
23182a4
feat(sharding): register dgraph.label as pre-defined reserved predicate
mlwelles Feb 4, 2026
432dade
feat(sharding): add entity label cache for UID -> label lookups
mlwelles Feb 4, 2026
affa2c4
feat(sharding): two-phase entity-label-aware mutation routing in popu…
mlwelles Feb 4, 2026
fad9ebf
feat(sharding): add AllSubTablets for query fan-out lookup
mlwelles Feb 4, 2026
9cf371d
feat(sharding): query fan-out across sub-tablets in ProcessTaskOverNe…
mlwelles Feb 4, 2026
605c1ed
feat(sharding): sort fan-out across sub-tablets in SortOverNetwork
mlwelles Feb 4, 2026
a8626ef
test(sharding): add entity-level routing integration test
mlwelles Feb 4, 2026
26e6423
feat(sharding): implement group-1 read for entity label cache miss
mlwelles Feb 4, 2026
695435b
feat(sharding): clear entity label cache on DropAll
mlwelles Feb 4, 2026
7a34b3f
fix(sharding): sort merged UIDs and fix tablet cache for entity-level…
mlwelles Feb 4, 2026
96636b3
feat(sharding): replace flat tablet maps with nested TabletIndex for …
mlwelles Feb 5, 2026
9a2e79b
refactor(sharding): simplify tablet API naming conventions
mlwelles Feb 5, 2026
4dc1278
chore(sharding): remove POC planning documents
mlwelles Feb 5, 2026
1831843
refactor(sharding): use IsLabeled() instead of raw Label == "" checks
mlwelles Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ x/log_test/*.enc
##
.claude/
CLAUDE.md
.osgrep
17 changes: 12 additions & 5 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,20 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
if strings.Contains(pred, hnsw.VecKeyword) {
pred = pred[0:strings.Index(pred, hnsw.VecKeyword)]
}
tablet := s.ServingTablet(pred)
if tablet == nil {
tablets := s.ServingTablets(pred)
if len(tablets) == 0 {
return errors.Errorf("Tablet for %s is nil", pred)
}
if tablet.GroupId != uint32(gid) {
return errors.Errorf("Mutation done in group: %d. Predicate %s assigned to %d",
gid, pred, tablet.GroupId)
found := false
for _, t := range tablets {
if t.GroupId == uint32(gid) {
found = true
break
}
}
if !found {
return errors.Errorf("Mutation done in group: %d. Predicate %s not assigned there",
gid, pred)
}
if s.isBlocked(pred) {
return errors.Errorf("Commits on predicate %s are blocked due to predicate move", pred)
Expand Down
37 changes: 23 additions & 14 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ func (n *node) regenerateChecksum() {
g.Checksum = farm.Fingerprint64([]byte(strings.Join(preds, "")))
}

// Rebuild tablet index from authoritative flat proto maps.
idx := pb.NewTabletIndex()
for _, g := range state.GetGroups() {
idx.BuildFromFlat(g.GetTablets())
}
n.server.tabletIndex = idx

if n.AmLeader() {
// It is important to push something to Oracle updates channel, so the subscribers would
// get the latest checksum that we calculated above. Otherwise, if all the queries are
Expand Down Expand Up @@ -315,11 +322,14 @@ func (n *node) handleTablet(tablet *pb.Tablet) error {
if tablet.GroupId == 0 {
return errors.Errorf("Tablet group id is zero: %+v", tablet)
}

key := pb.TabletKey(tablet.Predicate, tablet.Label)

group := state.Groups[tablet.GroupId]
if tablet.Remove {
glog.Infof("Removing tablet for attr: [%v], gid: [%v]\n", tablet.Predicate, tablet.GroupId)
glog.Infof("Removing tablet for key: [%v], gid: [%v]\n", key, tablet.GroupId)
if group != nil {
delete(group.Tablets, tablet.Predicate)
delete(group.Tablets, key)
}
return nil
}
Expand All @@ -328,29 +338,28 @@ func (n *node) handleTablet(tablet *pb.Tablet) error {
state.Groups[tablet.GroupId] = group
}

// There's a edge case that we're handling.
// Two servers ask to serve the same tablet, then we need to ensure that
// only the first one succeeds.
if prev := n.server.servingTablet(tablet.Predicate); prev != nil {
// Duplicate detection: check if this (predicate, label) pair is already served.
// Multiple groups CAN serve the same predicate as long as they have different labels.
if prev := n.server.servingTablet(tablet.Predicate, tablet.Label); prev != nil {
if tablet.Force {
originalGroup := state.Groups[prev.GroupId]
delete(originalGroup.Tablets, tablet.Predicate)
} else if tablet.Label != "" && prev.Label != tablet.Label {
delete(originalGroup.Tablets, key)
} else if tablet.IsLabeled() && prev.Label != tablet.Label {
// Allow re-routing when labels differ. This happens when a schema with @label
// is applied after the predicate was created without a label.
glog.Infof("Tablet for attr: [%s] re-routing from group %d to %d due to label change (%q -> %q)",
tablet.Predicate, prev.GroupId, tablet.GroupId, prev.Label, tablet.Label)
glog.Infof("Tablet for key: [%s] re-routing from group %d to %d due to label change (%q -> %q)",
key, prev.GroupId, tablet.GroupId, prev.Label, tablet.Label)
originalGroup := state.Groups[prev.GroupId]
delete(originalGroup.Tablets, tablet.Predicate)
delete(originalGroup.Tablets, key)
} else if prev.GroupId != tablet.GroupId {
glog.Infof(
"Tablet for attr: [%s], gid: [%d] already served by group: [%d]\n",
prev.Predicate, tablet.GroupId, prev.GroupId)
"Tablet for key: [%s], gid: [%d] already served by group: [%d]\n",
key, tablet.GroupId, prev.GroupId)
return errTabletAlreadyServed
}
}
tablet.Force = false
group.Tablets[tablet.Predicate] = tablet
group.Tablets[key] = tablet
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uin
// Reserved predicates should always be in group 1 so do not re-balance them.
continue
}
if tab.Label != "" {
if tab.IsLabeled() {
// labeled predicates are pinned and should not be re-balanced either
continue
}
Expand Down
89 changes: 60 additions & 29 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type Server struct {
blockCommitsOn *sync.Map

checkpointPerGroup map[uint32]uint64

// tabletIndex is a nested index rebuilt from flat proto maps for O(1) lookups.
tabletIndex *pb.TabletIndex

// embedding the pb.UnimplementedZeroServer struct to ensure forward compatibility of the server.
pb.UnimplementedZeroServer
}
Expand All @@ -89,6 +93,7 @@ func (s *Server) Init() {
s.blockCommitsOn = new(sync.Map)
s.moveOngoing = make(chan struct{}, 1)
s.checkpointPerGroup = make(map[uint32]uint64)
s.tabletIndex = pb.NewTabletIndex()
if opts.limiterConfig.UidLeaseLimit > 0 {
// rate limiting is not enabled when lease limit is set to zero.
s.rateLimiter = x.NewRateLimiter(int64(opts.limiterConfig.UidLeaseLimit),
Expand Down Expand Up @@ -253,6 +258,12 @@ func (s *Server) SetMembershipState(state *pb.MembershipState) {
}

s.nextGroup = uint32(len(state.Groups) + 1)

// Rebuild the tablet index from flat proto maps.
s.tabletIndex = pb.NewTabletIndex()
for _, g := range state.Groups {
s.tabletIndex.BuildFromFlat(g.Tablets)
}
}

// MarshalMembershipState returns the marshaled membership state.
Expand Down Expand Up @@ -309,13 +320,23 @@ func (s *Server) removeZero(nodeId uint64) {
func (s *Server) ServingTablet(tablet string) *pb.Tablet {
s.RLock()
defer s.RUnlock()
pred, label := pb.ParseTabletKey(tablet)
return s.tabletIndex.Get(pred, label)
}

for _, group := range s.state.Groups {
if tab, ok := group.Tablets[tablet]; ok {
return tab
}
// ServingTablets returns all tablets for a given predicate across all groups.
func (s *Server) ServingTablets(predicate string) []*pb.Tablet {
s.RLock()
defer s.RUnlock()
labels := s.tabletIndex.AllForPredicate(predicate)
if labels == nil {
return nil
}
return nil
tablets := make([]*pb.Tablet, 0, len(labels))
for _, tab := range labels {
tablets = append(tablets, tab)
}
return tablets
}

func (s *Server) blockTablet(pred string) func() {
Expand All @@ -330,15 +351,11 @@ func (s *Server) isBlocked(pred string) bool {
return blocked
}

func (s *Server) servingTablet(tablet string) *pb.Tablet {
// servingTablet returns the tablet for the given (predicate, label) pair.
// Caller must hold at least a read lock.
func (s *Server) servingTablet(predicate, label string) *pb.Tablet {
s.AssertRLock()

for _, group := range s.state.Groups {
if tab, ok := group.Tablets[tablet]; ok {
return tab
}
}
return nil
return s.tabletIndex.Get(predicate, label)
}

func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
Expand Down Expand Up @@ -418,7 +435,7 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR
tablets := make([]*pb.Tablet, 0)
unknownTablets := make([]*pb.Tablet, 0)
for _, t := range req.Tablets {
tab := s.ServingTablet(t.Predicate)
tab := s.ServingTablet(pb.TabletKey(t.Predicate, t.Label))
span.SetAttributes(attribute.String("tablet_predicate", t.Predicate))
switch {
case tab != nil && !t.Force:
Expand Down Expand Up @@ -456,7 +473,7 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
t.GroupId = 1
case t.Label != "":
case t.IsLabeled():
// Labeled predicate: route to matching labeled group
gid, err := s.labelGroup(t.Label)
if err != nil {
Expand Down Expand Up @@ -485,7 +502,7 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR
}

for _, t := range unknownTablets {
tab := s.ServingTablet(t.Predicate)
tab := s.ServingTablet(pb.TabletKey(t.Predicate, t.Label))
x.AssertTrue(tab != nil)
span.AddEvent(fmt.Sprintf("Tablet served: %+v", tab))
tablets = append(tablets, tab)
Expand Down Expand Up @@ -705,23 +722,37 @@ func (s *Server) ShouldServe(
return resp, errors.Errorf("Group ID is Zero in %+v", tablet)
}

// Check who is serving this tablet.
tab := s.ServingTablet(tablet.Predicate)
// Use the index to find the exact (predicate, label) match.
tab := s.ServingTablet(pb.TabletKey(tablet.Predicate, tablet.Label))
span.SetAttributes(attribute.String("tablet_predicate", tablet.Predicate))
span.SetAttributes(attribute.String("tablet_label", tablet.Label))
if tab == nil && !tablet.IsLabeled() {
// Unlabeled request: check if any labeled tablet exists for this predicate.
s.RLock()
tab = s.tabletIndex.GetAny(tablet.Predicate)
s.RUnlock()
}
if tab != nil && !tablet.Force {
// If the existing tablet has a different label than requested, we need to re-route.
// This can happen when a schema is applied with @label after the predicate was
// created without a label (e.g., during DropAll).
if tablet.Label != "" && tab.Label != tablet.Label {
if tablet.IsLabeled() && tab.Label != tablet.Label {
glog.Infof("ShouldServe: tablet %s has label %q but request has label %q, re-routing",
tablet.Predicate, tab.Label, tablet.Label)
// Fall through to re-assign the tablet with the new label
// The handleTablet function will allow this because labels differ
// Fall through to re-assign the tablet with the new label.
} else {
// Someone is serving this tablet. Could be the caller as well.
// The caller should compare the returned group against the group it holds to check who's
// serving.
// Someone is serving this tablet. If the found tablet belongs to a
// different group than the requester, check if the requesting group
// serves a tablet for this predicate under a different label.
if tablet.GroupId > 0 && tab.GroupId != tablet.GroupId {
s.RLock()
labels := s.tabletIndex.AllForPredicate(tablet.Predicate)
for _, labelTab := range labels {
if labelTab.GroupId == tablet.GroupId {
s.RUnlock()
return labelTab, nil
}
}
s.RUnlock()
}
return tab, nil
}
}
Expand All @@ -746,7 +777,7 @@ func (s *Server) ShouldServe(
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
tablet.GroupId = 1
case tablet.Label != "":
case tablet.IsLabeled():
// Labeled predicate: route to matching labeled group
gid, err := s.labelGroup(tablet.Label)
if err != nil {
Expand All @@ -770,7 +801,7 @@ func (s *Server) ShouldServe(
span.AddEvent(fmt.Sprintf("Error proposing tablet: %+v. Error: %v", &proposal, err))
return tablet, err
}
tab = s.ServingTablet(tablet.Predicate)
tab = s.ServingTablet(pb.TabletKey(tablet.Predicate, tablet.Label))
x.AssertTrue(tab != nil)
span.SetAttributes(attribute.String("tablet_predicate_served", tablet.Predicate))
return tab, nil
Expand Down Expand Up @@ -931,7 +962,7 @@ func (s *Server) groupLabel(gid uint32) string {
return ""
}
for _, member := range group.Members {
if member.Label != "" {
if member.IsLabeled() {
return member.Label
}
}
Expand Down
Loading