diff --git a/.ci/clusters/values.yaml b/.ci/clusters/values.yaml index cf1fc360..f6063653 100644 --- a/.ci/clusters/values.yaml +++ b/.ci/clusters/values.yaml @@ -22,43 +22,34 @@ affinity: # disable auto recovery components: + functions: true autorecovery: false pulsar_manager: false - sql_worker: false proxy: false toolset: false ## disable monitoring stack -monitoring: - # monitoring - prometheus - prometheus: false - # monitoring - grafana - grafana: false - # monitoring - node_exporter - node_exporter: false - # alerting - alert-manager - alert_manager: false - # monitoring - loki - loki: false - # monitoring - datadog - datadog: false +victoria-metrics-k8s-stack: + enabled: false images: zookeeper: - repository: streamnative/sn-platform - tag: 2.10.4.3 + repository: apachepulsar/pulsar-all + tag: 4.1.2 bookie: - repository: streamnative/sn-platform - tag: 2.10.4.3 + repository: apachepulsar/pulsar-all + tag: 4.1.2 broker: - repository: streamnative/sn-platform - tag: 2.10.4.3 + repository: apachepulsar/pulsar-all + tag: 4.1.2 functions: - repository: streamnative/sn-platform - tag: 2.10.4.3 + repository: apachepulsar/pulsar-all + tag: 4.1.2 zookeeper: replicaCount: 1 + podMonitor: + enabled: false resources: requests: memory: 256Mi @@ -66,10 +57,8 @@ zookeeper: bookkeeper: replicaCount: 1 - metadata: - image: - repository: streamnative/sn-platform - tag: 2.10.4.3 + podMonitor: + enabled: false resources: requests: memory: 256Mi @@ -94,7 +83,6 @@ bookkeeper: -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions - -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=4 -XX:ConcGCThreads=4 @@ -106,11 +94,13 @@ bookkeeper: pulsar_metadata: image: - repository: streamnative/sn-platform - tag: 2.10.4.3 + repository: apachepulsar/pulsar-all + tag: 4.1.2 broker: replicaCount: 1 + podMonitor: + enabled: false configData: ## Enable `autoSkipNonRecoverableData` since bookkeeper is running ## without persistence @@ -119,16 +109,22 @@ broker: managedLedgerDefaultEnsembleSize: "1" managedLedgerDefaultWriteQuorum: "1" managedLedgerDefaultAckQuorum: "1" - enablePackagesManagement: "true" PULSAR_PREFIX_enablePackagesManagement: "true" + PULSAR_PREFIX_packagesManagementStorageProvider: org.apache.pulsar.packages.management.storage.bookkeeper.BookKeeperPackagesStorageProvider + PULSAR_PREFIX_packagesReplicas: "1" + PULSAR_PREFIX_packagesManagementLedgerRootPath: /ledgers + PULSAR_PREFIX_functionsWorkerEnablePackageManagement: "true" PULSAR_PREFIX_topicLevelPoliciesEnabled: "true" resources: requests: memory: 256Mi cpu: 10m -functions: - functionState: false - useDedicatedRunner: false - configData: - narExtractionDirectory: /pulsar/data +autorecovery: + podMonitor: + enabled: false + +proxy: + podMonitor: + enabled: false + diff --git a/.github/workflows/e2e_test.yml b/.github/workflows/e2e_test.yml index f4674f64..43f6d642 100644 --- a/.github/workflows/e2e_test.yml +++ b/.github/workflows/e2e_test.yml @@ -34,8 +34,9 @@ jobs: ALWAYS_UPDATE_PULSAR_RESOURCE: ${{ matrix.alwaysUpdatePulsar }} GOPRIVATE: github.com/streamnative ACCESS_TOKEN: ${{ secrets.SNBOT_GITHUB_TOKEN }} - IMAGE: streamnative/sn-platform:2.10.4.3 + IMAGE: apachepulsar/pulsar-all:4.1.2 WATCH_CERT_MANAGER_CRDS: "false" + K8S_VERSION: v1.34.2 steps: - name: Free Disk Space (Ubuntu) uses: jlumbroso/free-disk-space@v1.3.0 @@ -72,14 +73,13 @@ jobs: - name: Checkout uses: actions/checkout@v3 - # TODO the k8s version should be configurable - name: Setup K8s cluster run: | - curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.23.0/kind-linux-amd64 + curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.31.0/kind-linux-amd64 chmod +x ./kind export PATH="$PWD:$PATH" kind version - ./hack/kind-cluster-build.sh --nodeNum 1 --k8sVersion v1.22.17 + ./hack/kind-cluster-build.sh --nodeNum 1 --k8sVersion "$K8S_VERSION" - name: Initialize K8s cluster run: | @@ -102,17 +102,14 @@ jobs: run: | helm repo add streamnative https://charts.streamnative.io helm repo add jetstack https://charts.jetstack.io + helm repo add apache https://pulsar.apache.org/charts helm repo update - helm install cert-manager jetstack/cert-manager --set installCRDs=true --version v1.8.2 - + helm install cert-manager jetstack/cert-manager --set crds.enabled=true --version v1.19.2 rm -rf pulsar-charts/ - git clone --branch pulsar-operator-0.17.10 https://github.com/streamnative/charts.git pulsar-charts + git clone --branch pulsar-4.4.0 https://github.com/apache/pulsar-helm-chart pulsar-charts cd pulsar-charts/ ./scripts/pulsar/prepare_helm_release.sh -n default -k test -c - helm repo add grafana https://grafana.github.io/helm-charts - helm repo update - helm dependency update charts/pulsar - helm install test --set initialize=true --values ../.ci/clusters/values.yaml charts/pulsar + helm install --set initialize=true --values ../.ci/clusters/values.yaml test apache/pulsar df -h - name: Run Operator Test diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 70fcd944..75d3dd3e 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -45,10 +45,10 @@ jobs: - uses: actions/checkout@v3 - name: golangci-lint - uses: golangci/golangci-lint-action@v6 + uses: golangci/golangci-lint-action@v8 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.64 + version: v2.7.2 args: --timeout=5m # Optional: working directory, useful for monorepos diff --git a/.gitignore b/.gitignore index 9609ee3f..c67deebe 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,7 @@ node_modules/ .cursor .envrc + +docker-compose.yaml +mise.toml +.jj/ diff --git a/.golangci.yml b/.golangci.yml index 4486f42d..8ff54222 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,4 +1,4 @@ -# Copyright 2024 StreamNative +# Copyright 2025 StreamNative # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,26 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +version: "2" + +run: + go: "1.24" + build-tags: + - tools + - e2e + allow-parallel-runners: true + linters: - disable-all: true + default: none enable: - asciicheck - bodyclose - - unused - # - deadcode - # - depguard + - copyloopvar - dogsled - errcheck - - copyloopvar - # - gci - gocritic - # - gocyclo - # - godot - - gofmt - - goimports - goprintffuncname - gosec - - gosimple - govet - importas - ineffassign @@ -43,166 +43,148 @@ linters: - predeclared - rowserrcheck - staticcheck - # - structcheck - - stylecheck - thelper - - typecheck - unconvert - unparam - # - varcheck - -linters-settings: - godot: - # declarations - for top level declaration comments (default); - # toplevel - for top level comments; - # all - for all comments. - scope: toplevel - exclude: - - '^ \+.*' - - "^ ANCHOR.*" - gci: - sections: - - prefix(github.com/streamnative) + - unused + settings: + gocritic: + disabled-checks: + - appendAssign + - dupImport + - evalOrder + - ifElseChain + - octalLiteral + - regexpSimplify + - sloppyReassign + - truncateCmp + - typeDefFirst + - unnamedResult + - unnecessaryDefer + - whyNoLint + - wrapperFunc + enabled-tags: + - experimental + godot: + scope: toplevel + exclude: + - ^ \+.* + - ^ ANCHOR.* + gosec: + excludes: + - G307 + - G108 + importas: + alias: + - pkg: k8s.io/api/core/v1 + alias: corev1 + - pkg: k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1 + alias: apiextensionsv1 + - pkg: k8s.io/apimachinery/pkg/apis/meta/v1 + alias: metav1 + - pkg: k8s.io/apimachinery/pkg/api/errors + alias: apierrors + - pkg: k8s.io/apimachinery/pkg/util/errors + alias: kerrors + - pkg: sigs.k8s.io/controller-runtime + alias: ctrl + no-unaliased: true + nolintlint: + require-specific: true + allow-unused: false + exclusions: + generated: lax + rules: + - linters: + - revive + text: 'exported: exported method .*\.(Reconcile|SetupWithManager|SetupWebhookWithManager) should have comment or be unexported' + - linters: + - errcheck + text: Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*print(f|ln)?|os\.(Un)?Setenv). is not checked + - linters: + - revive + text: exported (method|function|type|const) (.+) should have comment or be unexported + source: (func|type).*Fake.* + - linters: + - revive + path: fake_\.go + text: exported (method|function|type|const) (.+) should have comment or be unexported + - linters: + - revive + path: cmd/clusterctl/internal/test/providers.*.go + text: exported (method|function|type|const) (.+) should have comment or be unexported + - linters: + - revive + path: (framework|e2e)/.*.go + text: exported (method|function|type|const) (.+) should have comment or be unexported + - linters: + - unparam + text: always receives + - path: _test\.go + text: should not use dot imports + - path: (framework|e2e)/.*.go + text: should not use dot imports + - path: _test\.go + text: cyclomatic complexity + - linters: + - gocritic + text: 'appendAssign: append result not assigned to the same slice' + - linters: + - ifshort + path: controllers/mdutil/util.go + text: variable .* is only used in the if-statement + - linters: + - staticcheck + path: .*(api|types)\/.*\/conversion.*\.go$ + text: 'SA1019: in.(.+) is deprecated' + - linters: + - revive + path: .*(api|types)\/.*\/conversion.*\.go$ + text: exported (method|function|type|const) (.+) should have comment or be unexported + - linters: + - revive + path: .*(api|types)\/.*\/conversion.*\.go$ + text: 'var-naming: don''t use underscores in Go names;' + - linters: + - revive + path: .*(api|types)\/.*\/conversion.*\.go$ + text: 'receiver-naming: receiver name' + - linters: + - staticcheck + path: .*(api|types)\/.*\/conversion.*\.go$ + text: 'ST1003: should not use underscores in Go names;' + - linters: + - staticcheck + path: .*(api|types)\/.*\/conversion.*\.go$ + text: 'ST1016: methods on the same type should have the same receiver name' + - linters: + - ifshort + path: ^controllers/machine_controller\.go$ + text: variable 'isDeleteNodeAllowed' is only used in the if-statement.* + paths: + - zz_generated.*\.go$ + - third_party + - third_party$ + - builtin$ + - examples$ - importas: - no-unaliased: true - alias: - # Kubernetes - - pkg: k8s.io/api/core/v1 - alias: corev1 - - pkg: k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1 - alias: apiextensionsv1 - - pkg: k8s.io/apimachinery/pkg/apis/meta/v1 - alias: metav1 - - pkg: k8s.io/apimachinery/pkg/api/errors - alias: apierrors - - pkg: k8s.io/apimachinery/pkg/util/errors - alias: kerrors - # Controller Runtime - - pkg: sigs.k8s.io/controller-runtime - alias: ctrl - nolintlint: - allow-unused: false - require-specific: true - gosec: - excludes: - - G307 # Deferring unsafe method "Close" on type "\*os.File" - - G108 # Profiling endpoint is automatically exposed on /debug/pprof - gocritic: - enabled-tags: - - experimental - disabled-checks: - - appendAssign - - dupImport # https://github.com/go-critic/go-critic/issues/845 - - evalOrder - - ifElseChain - - octalLiteral - - regexpSimplify - - sloppyReassign - - truncateCmp - - typeDefFirst - - unnamedResult - - unnecessaryDefer - - whyNoLint - - wrapperFunc issues: - max-same-issues: 0 max-issues-per-linter: 0 - # We are disabling default golangci exclusions because we want to help reviewers to focus on reviewing the most relevant - # changes in PRs and avoid nitpicking. - exclude-use-default: false - exclude-files: - - "zz_generated.*\\.go$" - exclude-dirs: - - third_party - exclude-rules: - - linters: - - revive - text: "exported: exported method .*\\.(Reconcile|SetupWithManager|SetupWebhookWithManager) should have comment or be unexported" - - linters: - - errcheck - text: Error return value of .((os\.)?std(out|err)\..*|.*Close|.*Flush|os\.Remove(All)?|.*print(f|ln)?|os\.(Un)?Setenv). is not checked - # Exclude some packages or code to require comments, for example test code, or fake clients. - - linters: - - revive - text: exported (method|function|type|const) (.+) should have comment or be unexported - source: (func|type).*Fake.* - - linters: - - revive - text: exported (method|function|type|const) (.+) should have comment or be unexported - path: fake_\.go - - linters: - - revive - text: exported (method|function|type|const) (.+) should have comment or be unexported - path: cmd/clusterctl/internal/test/providers.*.go - - linters: - - revive - text: exported (method|function|type|const) (.+) should have comment or be unexported - path: "(framework|e2e)/.*.go" - # Disable unparam "always receives" which might not be really - # useful when building libraries. - - linters: - - unparam - text: always receives - # Dot imports for gomega or ginkgo are allowed - # within test files. - - path: _test\.go - text: should not use dot imports - - path: (framework|e2e)/.*.go - text: should not use dot imports - - path: _test\.go - text: cyclomatic complexity - # Append should be able to assign to a different var/slice. - - linters: - - gocritic - text: "appendAssign: append result not assigned to the same slice" - # ifshort flags variables that are only used in the if-statement even though there is - # already a SimpleStmt being used in the if-statement in question. - - linters: - - ifshort - text: "variable .* is only used in the if-statement" - path: controllers/mdutil/util.go - # Disable linters for conversion - - linters: - - staticcheck - text: "SA1019: in.(.+) is deprecated" - path: .*(api|types)\/.*\/conversion.*\.go$ - - linters: - - revive - text: exported (method|function|type|const) (.+) should have comment or be unexported - path: .*(api|types)\/.*\/conversion.*\.go$ - - linters: - - revive - text: "var-naming: don't use underscores in Go names;" - path: .*(api|types)\/.*\/conversion.*\.go$ - - linters: - - revive - text: "receiver-naming: receiver name" - path: .*(api|types)\/.*\/conversion.*\.go$ - - linters: - - stylecheck - text: "ST1003: should not use underscores in Go names;" - path: .*(api|types)\/.*\/conversion.*\.go$ - - linters: - - stylecheck - text: "ST1016: methods on the same type should have the same receiver name" - path: .*(api|types)\/.*\/conversion.*\.go$ - # hack/tools - - linters: - - typecheck - text: import (".+") is a program, not an importable package - path: ^tools\.go$ - # Ignore ifshort false positive - # TODO(sbueringer) false positive: https://github.com/esimonov/ifshort/issues/23 - - linters: - - ifshort - text: "variable 'isDeleteNodeAllowed' is only used in the if-statement.*" - path: ^controllers/machine_controller\.go$ + max-same-issues: 0 -run: - timeout: 10m - build-tags: - - tools - - e2e - allow-parallel-runners: true - go: "1.24" +formatters: + enable: + - gofmt + - goimports + settings: + gci: + sections: + - prefix(github.com/streamnative) + exclusions: + generated: lax + paths: + - zz_generated.*\.go$ + - third_party + - third_party$ + - builtin$ + - examples$ diff --git a/README.md b/README.md index 17fa1160..c9ab7048 100644 --- a/README.md +++ b/README.md @@ -44,9 +44,9 @@ You can install the Pulsar Resources Operator using the officially supported `pu ## Prerequisites -- Install [`kubectl`](https://kubernetes.io/docs/tasks/tools/#kubectl) (v1.16 - v1.24), compatible with your cluster (+/- 1 minor release from your cluster). +- Install [`kubectl`](https://kubernetes.io/docs/tasks/tools/#kubectl) (v1.16 - v1.34), compatible with your cluster (+/- 1 minor release from your cluster). - Install [`Helm`](https://helm.sh/docs/intro/install/) (v3.0.2 or higher). -- Prepare a Kubernetes cluster (v1.16 - v1.24). +- Prepare a Kubernetes cluster (v1.16 - v1.34). - Prepare a [Pulsar cluster](https://docs.streamnative.io/operators/pulsar-operator/tutorial/deploy-pulsar) diff --git a/controllers/secret_controller.go b/controllers/secret_controller.go index 3f6b6410..815e3297 100644 --- a/controllers/secret_controller.go +++ b/controllers/secret_controller.go @@ -136,7 +136,7 @@ func (r *SecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr finalizerName := cloudapi.SecretFinalizer // Handle deletion - if !secretCR.ObjectMeta.DeletionTimestamp.IsZero() { + if !secretCR.DeletionTimestamp.IsZero() { if controllerutil.ContainsFinalizer(secretCR, finalizerName) { if err := secretClient.DeleteSecret(ctx, secretCR); err != nil { // If the remote secret is already gone, that's okay. diff --git a/controllers/serviceaccountbinding_controller.go b/controllers/serviceaccountbinding_controller.go index b51fa5e9..3b09485b 100644 --- a/controllers/serviceaccountbinding_controller.go +++ b/controllers/serviceaccountbinding_controller.go @@ -237,7 +237,7 @@ func (r *ServiceAccountBindingReconciler) Reconcile(ctx context.Context, req ctr } } else { // Remote binding exists. - logger.Info("Remote ServiceAccountBinding already exists", "bindingName", remoteName, "poolMemberRef", poolMemberRef, "existingRemoteName", existingRemoteBinding.ObjectMeta.Name) + logger.Info("Remote ServiceAccountBinding already exists", "bindingName", remoteName, "poolMemberRef", poolMemberRef, "existingRemoteName", existingRemoteBinding.Name) // TODO: Implement update logic if necessary. // Compare existingRemoteBinding.Spec with what payloadForClient would generate via conversion. // For now, we assume if it exists, it's correctly configured or updates are not handled here. diff --git a/go.sum b/go.sum index d11a820f..3404aa0e 100644 --- a/go.sum +++ b/go.sum @@ -110,6 +110,7 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 h1:Om6kYQYDUk5wWbT0t0q6pvyM49i9XZAv9dDrkDA7gjk= +github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= diff --git a/hack/kind-cluster-build.sh b/hack/kind-cluster-build.sh index e004fe37..e37b2bf6 100755 --- a/hack/kind-cluster-build.sh +++ b/hack/kind-cluster-build.sh @@ -39,7 +39,7 @@ Options: -k,--k8sVersion version of the Kubernetes cluster,default value: v1.12.8 -v,--volumeNum the volumes number of each kubernetes node,default value: 9 Usage: - $0 --name testCluster --nodeNum 4 --k8sVersion v1.12.9 + $0 --name testCluster --nodeNum 4 --k8sVersion v1.35.0 EOF } @@ -82,7 +82,7 @@ done clusterName=${clusterName:-pulsar-dev} nodeNum=${nodeNum:-6} -k8sVersion=${k8sVersion:-v1.19.11} +k8sVersion=${k8sVersion:-v1.34.2} volumeNum=${volumeNum:-9} echo "clusterName: ${clusterName}" diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 88806cf0..c196ab00 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -223,10 +223,7 @@ func (p *PulsarAdminClient) DeleteTopic(name string) error { if err != nil { return err } - nonPartitioned := true - if topicMeta.Partitions > 0 { - nonPartitioned = false - } + nonPartitioned := topicMeta.Partitions < 1 if err := p.adminClient.Topics().Delete(*topic, true, nonPartitioned); err != nil { return err } diff --git a/scripts/lint.sh b/scripts/lint.sh index 4c31eaef..1fd69a7d 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -28,7 +28,7 @@ export POP_HOME=`cd $BINDIR/..;pwd` if [ ! -f ${POP_HOME}/bin/golangci-lint ]; then cd ${POP_HOME} - wget -O - -q https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s v1.55.2 + wget -O - -q https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s v2.7.2 cd - fi ${POP_HOME}/bin/golangci-lint --version diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index e9e4f320..8aa9faf6 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -32,18 +32,13 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" v1alphav1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" rutils "github.com/streamnative/pulsar-resources-operator/pkg/utils" "github.com/streamnative/pulsar-resources-operator/tests/utils" ) -type testJSON struct { - ID int `json:"id"` - Name string `json:"name"` -} - // validatePermissionStateAnnotation validates the PulsarPermission state annotation func validatePermissionStateAnnotation(ctx context.Context, permissionName, namespaceName string, expected connection.PulsarPermissionState) { permission := &v1alphav1.PulsarPermission{} @@ -93,9 +88,10 @@ func validatePermissionStateAnnotation(ctx context.Context, permissionName, name // but allows for additional roles to exist (for multi-permission testing) func validateTopicPermissionsContain(topicName string, expectedRoles []string) { Eventually(func(g Gomega) { + ctx := context.TODO() podName := fmt.Sprintf("%s-broker-0", brokerName) containerName := fmt.Sprintf("%s-broker", brokerName) - stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + stdout, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, "./bin/pulsar-admin topics permissions "+topicName) g.Expect(err).Should(Succeed()) g.Expect(stdout).Should(Not(BeEmpty())) @@ -112,27 +108,27 @@ var _ = Describe("Resources", func() { var ( ctx context.Context pconn *v1alphav1.PulsarConnection - pconnName string = "test-connection" + pconnName = "test-connection" ptenant *v1alphav1.PulsarTenant - ptenantName string = "test-tenant" - tenantName string = "cloud" + ptenantName = "test-tenant" + tenantName = "cloud" lifecyclePolicy v1alphav1.PulsarResourceLifeCyclePolicy pnamespace *v1alphav1.PulsarNamespace - pnamespaceName string = "test-namespace" - pulsarNamespaceName string = "cloud/stage" + pnamespaceName = "test-namespace" + pulsarNamespaceName = "cloud/stage" ptopic *v1alphav1.PulsarTopic - ptopicName string = "test-topic" - topicName string = "persistent://cloud/stage/user" + ptopicName = "test-topic" + topicName = "persistent://cloud/stage/user" ptopic2 *v1alphav1.PulsarTopic - ptopicName2 string = "test-topic2" - topicName2 string = "persistent://cloud/stage/user2" + ptopicName2 = "test-topic2" + topicName2 = "persistent://cloud/stage/user2" ppermission *v1alphav1.PulsarPermission - ppermissionName string = "test-permission" + ppermissionName = "test-permission" ppermission2 *v1alphav1.PulsarPermission - ppermission2Name string = "test-permission-2" + ppermission2Name = "test-permission-2" ppermission3 *v1alphav1.PulsarPermission - ppermission3Name string = "test-permission-3" - exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," + + ppermission3Name = "test-permission-3" + exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," + "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}" partitionedTopic = &v1alphav1.PulsarTopic{ ObjectMeta: metav1.ObjectMeta{ @@ -141,27 +137,27 @@ var _ = Describe("Resources", func() { }, Spec: v1alphav1.PulsarTopicSpec{ Name: "persistent://cloud/stage/partitioned-topic", - Partitions: pointer.Int32(1), + Partitions: ptr.To[int32](1), ConnectionRef: corev1.LocalObjectReference{ Name: pconnName, }, }, } ppackage *v1alphav1.PulsarPackage - ppackageurl string = "function://public/default/api-examples@v3.2.3.3" - pfuncName string = "test-func" - pfuncFailureName string = "func-test-failure" - psinkName string = "test-sink" - psourceName string = "test-source" - pclusterName string = "test-pulsar" - pnsIsolationPolicyName string = "test-ns-isolation-policy" + ppackageurl = "function://public/default/api-examples@v3.2.3.3" + pfuncName = "test-func" + pfuncFailureName = "func-test-failure" + psinkName = "test-sink" + psourceName = "test-source" + pclusterName = "test-pulsar" + pnsIsolationPolicyName = "test-ns-isolation-policy" pfunc *v1alphav1.PulsarFunction pfuncfailure *v1alphav1.PulsarFunction - psinkpackageurl string = "builtin://data-generator" + psinkpackageurl = "builtin://data-generator" psink *v1alphav1.PulsarSink psource *v1alphav1.PulsarSource pnsisolationpolicy *v1alphav1.PulsarNSIsolationPolicy - psourcepackageurl string = "builtin://data-generator" + psourcepackageurl = "builtin://data-generator" ) BeforeEach(func() { @@ -211,7 +207,7 @@ var _ = Describe("Resources", func() { It("should create the pulsar broker successfully", func() { Eventually(func() bool { statefulset := &v1.StatefulSet{} - k8sClient.Get(ctx, types.NamespacedName{ + _ = k8sClient.Get(ctx, types.NamespacedName{ Name: brokerName + "-broker", Namespace: namespaceName, }, statefulset) @@ -294,7 +290,7 @@ var _ = Describe("Resources", func() { // Update TopicAutoCreationConfig ns.Spec.TopicAutoCreationConfig.Allow = false ns.Spec.TopicAutoCreationConfig.Type = "non-partitioned" - ns.Spec.TopicAutoCreationConfig.Partitions = pointer.Int32(5) + ns.Spec.TopicAutoCreationConfig.Partitions = ptr.To[int32](5) err := k8sClient.Update(ctx, ns) Expect(err).Should(Succeed()) @@ -356,15 +352,15 @@ var _ = Describe("Resources", func() { Eventually(func(g Gomega) { podName := fmt.Sprintf("%s-broker-0", brokerName) containerName := fmt.Sprintf("%s-broker", brokerName) - stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, - "./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN schemas get "+ptopic.Spec.Name) + stdout, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin schemas get "+ptopic.Spec.Name) g.Expect(err).Should(Succeed()) g.Expect(stdout).Should(Not(BeEmpty())) format.MaxLength = 0 g.Expect(stdout).Should(ContainSubstring("JSON")) - stdout, _, err = utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, - "./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN schemas get "+ptopic2.Spec.Name) + stdout, _, err = utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin schemas get "+ptopic2.Spec.Name) g.Expect(err).Should(Succeed()) g.Expect(stdout).Should(Not(BeEmpty())) format.MaxLength = 0 @@ -376,13 +372,10 @@ var _ = Describe("Resources", func() { podName := fmt.Sprintf("%s-broker-0", brokerName) containerName := fmt.Sprintf("%s-broker", brokerName) - By("delete topic2 with pulsarctl") - _, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, - "./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN "+ - "topics delete -f --non-partitioned "+ptopic2.Spec.Name) + By("delete topic2 with pulsar-admin") + _, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin topics delete -f "+ptopic2.Spec.Name) Expect(err).ShouldNot(HaveOccurred()) - format.MaxLength = 0 - Expect(stderr).Should(ContainSubstring("successfully")) By("delete topic1 schema in k8s") topic := &v1alphav1.PulsarTopic{} @@ -390,16 +383,6 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) topic.Spec.SchemaInfo = nil Expect(k8sClient.Update(ctx, topic)).Should(Succeed()) - - // By("check topic1 schema is deleted in pulsar") - // Eventually(func(g Gomega) { - // _, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, - // "./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN schemas get "+ptopic.Spec.Name) - // g.Expect(err).ShouldNot(BeNil()) - // g.Expect(stderr).Should(Not(BeEmpty())) - // format.MaxLength = 0 - // g.Expect(stderr).Should(ContainSubstring("404")) - // }, "5s", "100ms").Should(Succeed()) }) It("should increase the partitions successfully", func() { @@ -409,7 +392,7 @@ var _ = Describe("Resources", func() { Name: partitionedTopic.Name, }, curTopic)).ShouldNot(HaveOccurred()) - curTopic.Spec.Partitions = pointer.Int32(2) + curTopic.Spec.Partitions = ptr.To[int32](2) err := k8sClient.Update(ctx, curTopic) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() bool { @@ -425,8 +408,8 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Compaction Threshold", Ordered, func() { var ( compactionTopic *v1alphav1.PulsarTopic - compactionTopicName string = "test-compaction-topic" - compactionThreshold int64 = 104857600 // 100MB in bytes + compactionTopicName = "test-compaction-topic" + compactionThreshold int64 = 104857600 // 100MB in bytes ) BeforeAll(func() { @@ -510,7 +493,7 @@ var _ = Describe("Resources", func() { return true } - var state map[string]interface{} + var state map[string]any Expect(json.Unmarshal([]byte(rawState), &state)).Should(Succeed()) value, ok := state["compactionThreshold"] return !ok || value == nil @@ -533,7 +516,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Persistence Policies", Ordered, func() { var ( persistenceTopic *v1alphav1.PulsarTopic - persistenceTopicName string = "test-persistence-topic" + persistenceTopicName = "test-persistence-topic" ) BeforeAll(func() { @@ -579,9 +562,9 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) // Update persistence policies - topic.Spec.PersistencePolicies.BookkeeperEnsemble = pointer.Int32(5) - topic.Spec.PersistencePolicies.BookkeeperWriteQuorum = pointer.Int32(3) - topic.Spec.PersistencePolicies.BookkeeperAckQuorum = pointer.Int32(3) + topic.Spec.PersistencePolicies.BookkeeperEnsemble = ptr.To[int32](5) + topic.Spec.PersistencePolicies.BookkeeperWriteQuorum = ptr.To[int32](3) + topic.Spec.PersistencePolicies.BookkeeperAckQuorum = ptr.To[int32](3) err := k8sClient.Update(ctx, topic) Expect(err).Should(Succeed()) }) @@ -610,7 +593,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Delayed Delivery", Ordered, func() { var ( delayedDeliveryTopic *v1alphav1.PulsarTopic - delayedDeliveryTopicName string = "test-delayed-delivery-topic" + delayedDeliveryTopicName = "test-delayed-delivery-topic" ) BeforeAll(func() { @@ -654,7 +637,7 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) // Update delayed delivery - topic.Spec.DelayedDelivery.TickTimeMillis = pointer.Int64(2000) + topic.Spec.DelayedDelivery.TickTimeMillis = ptr.To[int64](2000) err := k8sClient.Update(ctx, topic) Expect(err).Should(Succeed()) }) @@ -683,7 +666,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Dispatch Rate", Ordered, func() { var ( dispatchRateTopic *v1alphav1.PulsarTopic - dispatchRateTopicName string = "test-dispatch-rate-topic" + dispatchRateTopicName = "test-dispatch-rate-topic" ) BeforeAll(func() { @@ -728,8 +711,8 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) // Update dispatch rate - topic.Spec.DispatchRate.DispatchThrottlingRateInMsg = pointer.Int32(1000) - topic.Spec.DispatchRate.DispatchThrottlingRateInByte = pointer.Int64(1048576) + topic.Spec.DispatchRate.DispatchThrottlingRateInMsg = ptr.To[int32](1000) + topic.Spec.DispatchRate.DispatchThrottlingRateInByte = ptr.To[int64](1048576) err := k8sClient.Update(ctx, topic) Expect(err).Should(Succeed()) }) @@ -758,7 +741,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Publish Rate", Ordered, func() { var ( publishRateTopic *v1alphav1.PulsarTopic - publishRateTopicName string = "test-publish-rate-topic" + publishRateTopicName = "test-publish-rate-topic" ) BeforeAll(func() { @@ -802,8 +785,8 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) // Update publish rate - topic.Spec.PublishRate.PublishThrottlingRateInMsg = pointer.Int32(2000) - topic.Spec.PublishRate.PublishThrottlingRateInByte = pointer.Int64(2097152) + topic.Spec.PublishRate.PublishThrottlingRateInMsg = ptr.To[int32](2000) + topic.Spec.PublishRate.PublishThrottlingRateInByte = ptr.To[int64](2097152) err := k8sClient.Update(ctx, topic) Expect(err).Should(Succeed()) }) @@ -832,7 +815,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Inactive Topic Policies", Ordered, func() { var ( inactiveTopicPoliciesTopic *v1alphav1.PulsarTopic - inactiveTopicPoliciesTopicName string = "test-inactive-topic-policies-topic" + inactiveTopicPoliciesTopicName = "test-inactive-topic-policies-topic" ) BeforeAll(func() { @@ -877,8 +860,8 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) // Update inactive topic policies - topic.Spec.InactiveTopicPolicies.MaxInactiveDurationInSeconds = pointer.Int32(3600) - topic.Spec.InactiveTopicPolicies.DeleteWhileInactive = pointer.Bool(false) + topic.Spec.InactiveTopicPolicies.MaxInactiveDurationInSeconds = ptr.To[int32](3600) + topic.Spec.InactiveTopicPolicies.DeleteWhileInactive = ptr.To(false) err := k8sClient.Update(ctx, topic) Expect(err).Should(Succeed()) }) @@ -907,7 +890,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Subscribe Rate", Ordered, func() { var ( subscribeRateTopic *v1alphav1.PulsarTopic - subscribeRateTopicName string = "test-subscribe-rate-topic" + subscribeRateTopicName = "test-subscribe-rate-topic" ) BeforeAll(func() { @@ -951,8 +934,8 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) // Update subscribe rate - topic.Spec.SubscribeRate.SubscribeThrottlingRatePerConsumer = pointer.Int32(20) - topic.Spec.SubscribeRate.RatePeriodInSecond = pointer.Int32(60) + topic.Spec.SubscribeRate.SubscribeThrottlingRatePerConsumer = ptr.To[int32](20) + topic.Spec.SubscribeRate.RatePeriodInSecond = ptr.To[int32](60) err := k8sClient.Update(ctx, topic) Expect(err).Should(Succeed()) }) @@ -981,7 +964,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Offload Policies", Ordered, func() { var ( offloadPoliciesTopic *v1alphav1.PulsarTopic - offloadPoliciesTopicName string = "test-offload-policies-topic" + offloadPoliciesTopicName = "test-offload-policies-topic" ) BeforeAll(func() { @@ -1056,7 +1039,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Auto Subscription Creation", Ordered, func() { var ( autoSubscriptionTopic *v1alphav1.PulsarTopic - autoSubscriptionTopicName string = "test-auto-subscription-topic" + autoSubscriptionTopicName = "test-auto-subscription-topic" ) BeforeAll(func() { @@ -1128,7 +1111,7 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Schema Compatibility Strategy", Ordered, func() { var ( schemaCompatibilityTopic *v1alphav1.PulsarTopic - schemaCompatibilityTopicName string = "test-schema-compatibility-topic" + schemaCompatibilityTopicName = "test-schema-compatibility-topic" ) BeforeAll(func() { @@ -1201,20 +1184,20 @@ var _ = Describe("Resources", func() { Context("PulsarTopic Infinite Retention Policies", Ordered, func() { var ( infiniteRetentionTopic *v1alphav1.PulsarTopic - infiniteRetentionTopicName string = "test-infinite-retention-topic" - infiniteRetentionFullTopicName string = "persistent://cloud/stage/infinite-retention-test" + infiniteRetentionTopicName = "test-infinite-retention-topic" + infiniteRetentionFullTopicName = "persistent://cloud/stage/infinite-retention-test" infiniteTimeTopic *v1alphav1.PulsarTopic - infiniteTimeTopicName string = "test-infinite-time-topic" - infiniteTimeFullTopicName string = "persistent://cloud/stage/infinite-time-test" + infiniteTimeTopicName = "test-infinite-time-topic" + infiniteTimeFullTopicName = "persistent://cloud/stage/infinite-time-test" infiniteSizeTopic *v1alphav1.PulsarTopic - infiniteSizeTopicName string = "test-infinite-size-topic" - infiniteSizeFullTopicName string = "persistent://cloud/stage/infinite-size-test" + infiniteSizeTopicName = "test-infinite-size-topic" + infiniteSizeFullTopicName = "persistent://cloud/stage/infinite-size-test" finiteRetentionTopic *v1alphav1.PulsarTopic - finiteRetentionTopicName string = "test-finite-retention-topic" - finiteRetentionFullTopicName string = "persistent://cloud/stage/finite-retention-test" + finiteRetentionTopicName = "test-finite-retention-topic" + finiteRetentionFullTopicName = "persistent://cloud/stage/finite-retention-test" ) BeforeAll(func() { @@ -1484,8 +1467,8 @@ var _ = Describe("Resources", func() { Context("PulsarNamespace Rate Limiting", Ordered, func() { var ( rateLimitingNamespace *v1alphav1.PulsarNamespace - rateLimitingNamespaceName string = "test-ratelimiting-namespace" - rateLimitingPulsarNSName string = "cloud/ratelimiting" + rateLimitingNamespaceName = "test-ratelimiting-namespace" + rateLimitingPulsarNSName = "cloud/ratelimiting" ) BeforeAll(func() { @@ -1576,8 +1559,8 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) // Update DispatchRate - ns.Spec.DispatchRate.DispatchThrottlingRateInMsg = pointer.Int32(1500) - ns.Spec.DispatchRate.DispatchThrottlingRateInByte = pointer.Int64(1572864) // 1.5MB + ns.Spec.DispatchRate.DispatchThrottlingRateInMsg = ptr.To[int32](1500) + ns.Spec.DispatchRate.DispatchThrottlingRateInByte = ptr.To[int64](1572864) // 1.5MB err := k8sClient.Update(ctx, ns) Expect(err).Should(Succeed()) @@ -1619,8 +1602,8 @@ var _ = Describe("Resources", func() { Context("PulsarNamespace Storage Policies", Ordered, func() { var ( storagePoliciesNamespace *v1alphav1.PulsarNamespace - storagePoliciesNamespaceName string = "test-storage-namespace" - storagePoliciesPulsarNSName string = "cloud/storage" + storagePoliciesNamespaceName = "test-storage-namespace" + storagePoliciesPulsarNSName = "cloud/storage" ) BeforeAll(func() { @@ -1715,7 +1698,7 @@ var _ = Describe("Resources", func() { podName := fmt.Sprintf("%s-broker-0", brokerName) containerName := fmt.Sprintf("%s-broker", brokerName) Eventually(func(g Gomega) { - stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + stdout, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, "./bin/pulsar-admin namespaces get-subscription-expiration-time "+storagePoliciesPulsarNSName) g.Expect(err).Should(Succeed()) // 7d -> 10080 minutes @@ -1757,7 +1740,7 @@ var _ = Describe("Resources", func() { podName := fmt.Sprintf("%s-broker-0", brokerName) containerName := fmt.Sprintf("%s-broker", brokerName) Eventually(func(g Gomega) { - stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + stdout, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, "./bin/pulsar-admin namespaces get-subscription-expiration-time "+storagePoliciesPulsarNSName) g.Expect(err).Should(Succeed()) // Pulsar returns null once the expiration time is removed (infinite) @@ -1789,7 +1772,7 @@ var _ = Describe("Resources", func() { current := &v1alphav1.PulsarNamespace{} g.Expect(k8sClient.Get(ctx, tns, current)).Should(Succeed()) - cond := metautil.FindStatusCondition(current.Status.Conditions, string(v1alphav1.ConditionReady)) + cond := metautil.FindStatusCondition(current.Status.Conditions, v1alphav1.ConditionReady) if cond == nil { return metav1.ConditionUnknown } @@ -1799,7 +1782,7 @@ var _ = Describe("Resources", func() { // Re-fetch to get latest ResourceVersion before updating Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) - ns.Spec.BacklogQuotaRetentionPolicy = pointer.String("producer_request_hold") + ns.Spec.BacklogQuotaRetentionPolicy = ptr.To("producer_request_hold") Expect(k8sClient.Update(ctx, ns)).Should(Succeed()) }) @@ -1839,9 +1822,10 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) // Update PersistencePolicies - ns.Spec.PersistencePolicies.BookkeeperEnsemble = pointer.Int32(5) - ns.Spec.PersistencePolicies.BookkeeperWriteQuorum = pointer.Int32(4) - ns.Spec.PersistencePolicies.BookkeeperAckQuorum = pointer.Int32(3) + + ns.Spec.PersistencePolicies.BookkeeperEnsemble = ptr.To[int32](5) + ns.Spec.PersistencePolicies.BookkeeperWriteQuorum = ptr.To[int32](4) + ns.Spec.PersistencePolicies.BookkeeperAckQuorum = ptr.To[int32](3) err := k8sClient.Update(ctx, ns) Expect(err).Should(Succeed()) @@ -1903,7 +1887,7 @@ var _ = Describe("Resources", func() { // podName := fmt.Sprintf("%s-broker-0", brokerName) // containerName := fmt.Sprintf("%s-broker", brokerName) // Eventually(func(g Gomega) { - // stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + // stdout, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, // "./bin/pulsar-admin namespaces get-persistence "+storagePoliciesPulsarNSName) // g.Expect(err).Should(Succeed()) // // When persistence policies are reset, broker returns default values (0) @@ -1959,8 +1943,8 @@ var _ = Describe("Resources", func() { Context("PulsarNamespace Security Configuration", Ordered, func() { var ( securityNamespace *v1alphav1.PulsarNamespace - securityNamespaceName string = "test-security-namespace" - securityPulsarNSName string = "cloud/security" + securityNamespaceName = "test-security-namespace" + securityPulsarNSName = "cloud/security" ) BeforeAll(func() { @@ -2080,7 +2064,7 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) // Temporarily disable encryption requirement - ns.Spec.EncryptionRequired = pointer.Bool(false) + ns.Spec.EncryptionRequired = ptr.To(false) err := k8sClient.Update(ctx, ns) Expect(err).Should(Succeed()) @@ -2111,7 +2095,7 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) // Disable producer name validation - ns.Spec.ValidateProducerName = pointer.Bool(false) + ns.Spec.ValidateProducerName = ptr.To(false) err := k8sClient.Update(ctx, ns) Expect(err).Should(Succeed()) @@ -2200,7 +2184,7 @@ var _ = Describe("Resources", func() { Eventually(func(g Gomega) { podName := fmt.Sprintf("%s-broker-0", brokerName) containerName := fmt.Sprintf("%s-broker", brokerName) - stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + stdout, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, "./bin/pulsar-admin topics permissions "+ppermission.Spec.ResourceName) g.Expect(err).Should(Succeed()) g.Expect(stdout).Should(Not(BeEmpty())) @@ -2246,7 +2230,7 @@ var _ = Describe("Resources", func() { Eventually(func(g Gomega) { podName := fmt.Sprintf("%s-broker-0", brokerName) containerName := fmt.Sprintf("%s-broker", brokerName) - stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + stdout, _, err := utils.ExecInPod(ctx, k8sConfig, namespaceName, podName, containerName, "./bin/pulsar-admin topics permissions "+ptopic.Spec.Name) g.Expect(err).Should(Succeed()) g.Expect(stdout).Should(Not(BeEmpty())) diff --git a/tests/utils/k8s.go b/tests/utils/k8s.go index 61185e01..e35d58c5 100644 --- a/tests/utils/k8s.go +++ b/tests/utils/k8s.go @@ -16,6 +16,7 @@ package utils import ( "bytes" + "context" "path/filepath" "strings" @@ -31,13 +32,13 @@ import ( // GetKubeConfig return the kubeconfig from kubeConfigPath // if the path is empty, it will get config from $HOME/.kube/config as default func GetKubeConfig(kubeConfigPath string) (*rest.Config, error) { - if len(kubeConfigPath) != 0 { + if kubeConfigPath != "" { return clientcmd.BuildConfigFromFlags("", kubeConfigPath) } return clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config")) } -func ExecInPod(config *rest.Config, namespace, podName, containerName, command string) (string, string, error) { +func ExecInPod(ctx context.Context, config *rest.Config, namespace, podName, containerName, command string) (string, string, error) { k8sCli, err := kubernetes.NewForConfig(config) if err != nil { return "", "", err @@ -68,7 +69,7 @@ func ExecInPod(config *rest.Config, namespace, podName, containerName, command s if err != nil { return "", "", err } - err = exec.Stream(remotecommand.StreamOptions{ + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ Stdin: nil, Stdout: &stdout, Stderr: &stderr, diff --git a/tests/utils/spec.go b/tests/utils/spec.go index 79d2f52b..113d1efe 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -19,7 +19,7 @@ import ( "os" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" @@ -531,10 +531,11 @@ func MakePulsarFunction(namespace, name, functionPackageUrl, connectionName stri TimeoutMs: ptr.To[int64](6666), Secrets: map[string]v1alpha1.FunctionSecretKeyRef{ "SECRET1": { - "sectest", "hello", + Path: "hello", + Key: "secret", }, }, - CustomRuntimeOptions: getMapToJSON(map[string]interface{}{ + CustomRuntimeOptions: getMapToJSON(map[string]any{ "env": map[string]string{ "HELLO": "WORLD", }, @@ -573,10 +574,11 @@ func MakePulsarSink(namespace, name, sinkPackageUrl, connectionName string, poli }, Secrets: map[string]v1alpha1.FunctionSecretKeyRef{ "SECRET1": { - "sectest", "hello", + Path: "hello", + Key: "secret", }, }, - CustomRuntimeOptions: getMapToJSON(map[string]interface{}{ + CustomRuntimeOptions: getMapToJSON(map[string]any{ "env": map[string]string{ "HELLO": "WORLD", }, @@ -585,23 +587,23 @@ func MakePulsarSink(namespace, name, sinkPackageUrl, connectionName string, poli } } -func getPulsarSourceConfig() *v1.JSON { - c := map[string]interface{}{ +func getPulsarSourceConfig() *apiextensionsv1.JSON { + c := map[string]any{ "sleepBetweenMessages": 1000, } bytes, err := json.Marshal(c) if err != nil { return nil } - return &v1.JSON{Raw: bytes} + return &apiextensionsv1.JSON{Raw: bytes} } -func getMapToJSON(c map[string]interface{}) *v1.JSON { +func getMapToJSON(c map[string]any) *apiextensionsv1.JSON { bytes, err := json.Marshal(c) if err != nil { return nil } - return &v1.JSON{Raw: bytes} + return &apiextensionsv1.JSON{Raw: bytes} } // MakePulsarSource will generate a object of PulsarSource @@ -632,10 +634,11 @@ func MakePulsarSource(namespace, name, sourcePackageUrl, connectionName string, }, Secrets: map[string]v1alpha1.FunctionSecretKeyRef{ "SECRET1": { - "sectest", "hello", + Path: "hello", + Key: "secret", }, }, - CustomRuntimeOptions: getMapToJSON(map[string]interface{}{ + CustomRuntimeOptions: getMapToJSON(map[string]any{ "env": map[string]string{ "HELLO": "WORLD", },