Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ type flags struct {
ReplicateServiceAccounts bool
SyncByContent bool
ExcludeNamespaces string
ComponentsSyncPeriodS string
ComponentsSyncPeriod time.Duration
}
68 changes: 62 additions & 6 deletions liveness/handle.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package liveness

import (
"context"
"encoding/json"
"fmt"
"github.com/mittwald/kubernetes-replicator/replicate/common"
"net/http"
"sync"
"time"

log "github.com/sirupsen/logrus"

"github.com/mittwald/kubernetes-replicator/replicate/common"
)

type response struct {
Expand All @@ -15,30 +21,80 @@ type response struct {
// liveness status of the controller
type Handler struct {
Replicators []common.Replicator
SyncPeriod time.Duration

mu sync.Mutex
notReady []string
}

func (h *Handler) notReadyComponents() []string {
// RunSyncLoop starts an infinite synchronization loop of replicators
func (h *Handler) RunSyncLoop() {
h.notReady = make([]string, 0)
t := time.NewTicker(h.SyncPeriod)
defer t.Stop()

for {
h.notReadyComponents()
<-t.C
}
}

func (h *Handler) notReadyComponents() {
notReady := make([]string, 0)

now := time.Now()
for i := range h.Replicators {
synced := h.Replicators[i].Synced()
synced := h.checkReplicatorSynced(h.Replicators[i])

if !synced {
notReady = append(notReady, fmt.Sprintf("%T", h.Replicators[i]))
}
}
duration := time.Since(now)

h.mu.Lock()
h.notReady = notReady
h.mu.Unlock()

log.Infof("Sum of sync durations for all replicators: %v ms", duration.Milliseconds())
}

func (h *Handler) checkReplicatorSynced(replicator common.Replicator) bool {
var synced bool
syncedChan := make(chan bool, 1)

ctx, cancel := context.WithTimeout(context.Background(), h.SyncPeriod)
defer cancel()

now := time.Now()
go func() {
syncedChan <- replicator.Synced()
}()

select {
case synced = <-syncedChan:
case <-ctx.Done():
synced = false
log.Warnf("Timeout for sync replicator %s after %v", replicator.GetKind(), h.SyncPeriod)
}

duration := time.Since(now)

log.Infof("Sync duration for replicator %s: %v ms", replicator.GetKind(), duration.Milliseconds())

return notReady
return synced
}

//noinspection GoUnusedParameter
// noinspection GoUnusedParameter
func (h *Handler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/healthz" {
res.WriteHeader(http.StatusOK)
} else {
h.mu.Lock()
r := response{
NotReady: h.notReadyComponents(),
NotReady: h.notReady,
}
h.mu.Unlock()

if len(r.NotReady) > 0 {
res.WriteHeader(http.StatusServiceUnavailable)
Expand Down
12 changes: 9 additions & 3 deletions liveness/handle_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package liveness

import (
"github.com/mittwald/kubernetes-replicator/replicate/common"
v1 "k8s.io/api/core/v1"
"net/http"
"net/http/httptest"
"testing"

"github.com/mittwald/kubernetes-replicator/replicate/common"
v1 "k8s.io/api/core/v1"

"github.com/stretchr/testify/assert"
)

Expand All @@ -21,11 +22,15 @@ func (r *MockReplicator) Synced() bool {
return r.synced
}

//noinspection GoUnusedParameter
// noinspection GoUnusedParameter
func (r *MockReplicator) NamespaceAdded(ns *v1.Namespace) {
// Do nothing
}

func (r *MockReplicator) GetKind() string {
return ""
}

func buildReqRes(t *testing.T) (*http.Request, *httptest.ResponseRecorder) {
req, err := http.NewRequest("GET", "/status", nil)
res := httptest.NewRecorder()
Expand Down Expand Up @@ -57,6 +62,7 @@ func TestReturns503IfOneReplicatorIsNotSynced(t *testing.T) {
&MockReplicator{synced: true},
&MockReplicator{synced: false},
},
notReady: []string{"replicator2"},
}

handler.ServeHTTP(res, req)
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func init() {
flag.BoolVar(&f.ReplicateServiceAccounts, "replicate-service-accounts", true, "Enable replication of service accounts")
flag.BoolVar(&f.SyncByContent, "sync-by-content", false, "Always compare the contents of source and target resources and force them to be the same")
flag.StringVar(&f.ExcludeNamespaces, "exclude-namespaces", "", "Comma-separated list of regex patterns for namespaces to exclude from replication")
flag.StringVar(&f.ComponentsSyncPeriodS, "components-sync-period", "30s", "components sync period")
flag.Parse()

switch strings.ToUpper(strings.TrimSpace(f.LogLevel)) {
Expand All @@ -64,6 +65,10 @@ func init() {
if err != nil {
panic(err)
}
f.ComponentsSyncPeriod, err = time.ParseDuration(f.ComponentsSyncPeriodS)
if err != nil {
panic(err)
}

log.Debugf("using flag values %#v", f)
}
Expand Down Expand Up @@ -124,7 +129,9 @@ func main() {

h := liveness.Handler{
Replicators: enabledReplicators,
SyncPeriod: f.ComponentsSyncPeriod,
}
go h.RunSyncLoop()

log.Infof("starting liveness monitor at %s", f.StatusAddr)

Expand Down
4 changes: 3 additions & 1 deletion replicate/common/common.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package common

import (
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
)

type Replicator interface {
Run()
Synced() bool
NamespaceAdded(ns *v1.Namespace)
GetKind() string
}

func PreviouslyPresentKeys(object *metav1.ObjectMeta) (map[string]struct{}, bool) {
Expand Down
4 changes: 4 additions & 0 deletions replicate/common/generic-replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (r *GenericReplicator) Run() {
r.Controller.Run(wait.NeverStop)
}

func (r *GenericReplicator) GetKind() string {
return r.Kind
}

// NamespaceAdded replicates resources with ReplicateTo and ReplicateToMatching
// annotations into newly created namespaces.
func (r *GenericReplicator) NamespaceAdded(ns *v1.Namespace) {
Expand Down