Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,7 @@ protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
protected List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
ImmutableList.Builder<StagedFile> filesToStageBuilder = ImmutableList.builder();
Set<String> stagedNames = new HashSet<>();
Map<String, String> hashes = new java.util.HashMap<>();
for (Map.Entry<String, RunnerApi.Environment> entry :
pipeline.getComponents().getEnvironmentsMap().entrySet()) {
for (RunnerApi.ArtifactInformation info : entry.getValue().getDependenciesList()) {
Expand Down Expand Up @@ -1172,10 +1173,14 @@ protected List<DataflowPackage> stageArtifacts(RunnerApi.Pipeline pipeline) {
} else {
stagedNames.add(stagedName);
}
hashes.put(stagedName, filePayload.getSha256());
filesToStageBuilder.add(
StagedFile.of(filePayload.getPath(), filePayload.getSha256(), stagedName));
}
}
options
.as(org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.class)
.setArtifactHashes(hashes);
return options.getStager().stageFiles(filesToStageBuilder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public String create(PipelineOptions options) {

void setHotKeyLoggingEnabled(boolean value);

@Description("Map of staged artifact names to their expected SHA-256 hashes.")
Map<String, String> getArtifactHashes();

void setArtifactHashes(Map<String, String> hashes);

/**
* Open modules needed for reflection that access JDK internals with Java 9+
*
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func main() {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
}

// Inject full pipeline options into context
ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions())

// (2) Retrieve the staged files.
//
// The Go SDK harness downloads the worker binary and invokes
Expand Down
69 changes: 65 additions & 4 deletions sdks/go/pkg/beam/artifact/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"google.golang.org/protobuf/proto"
structpb "google.golang.org/protobuf/types/known/structpb"
)

// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto
Expand Down Expand Up @@ -83,6 +84,8 @@ func newMaterializeWithClient(ctx context.Context, client jobpb.ArtifactRetrieva
return nil, err
}

hashes := extractArtifactHashes(ctx)

var artifacts []*pipepb.ArtifactInformation
var list []retrievable
for _, dep := range resolution.Replacements {
Expand Down Expand Up @@ -131,6 +134,14 @@ func newMaterializeWithClient(ctx context.Context, client jobpb.ArtifactRetrieva
RoleUrn: URNStagingTo,
RolePayload: rolePayload,
},
expectedSha256: func() string {
if hashes != nil {
if trusted, ok := hashes[path]; ok {
return trusted
}
}
return filePayload.Sha256
}(),
})
}

Expand Down Expand Up @@ -183,8 +194,9 @@ func MustExtractFilePayload(artifact *pipepb.ArtifactInformation) (string, strin
}

type artifact struct {
client jobpb.ArtifactRetrievalServiceClient
dep *pipepb.ArtifactInformation
client jobpb.ArtifactRetrievalServiceClient
dep *pipepb.ArtifactInformation
expectedSha256 string
}

func (a artifact) retrieve(ctx context.Context, dest string) error {
Expand Down Expand Up @@ -229,9 +241,17 @@ func (a artifact) retrieve(ctx context.Context, dest string) error {
return errors.Wrapf(err, "failed to flush chunks for %v", filename)
}
stat, _ := fd.Stat()
log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename, sha256Hash, stat.Size())
log.Printf("Downloaded: %v (sha256: %v, size: %v, expectedSha256: %v)", filename, sha256Hash, stat.Size(), a.expectedSha256)

if err := fd.Close(); err != nil {
return err
}

return fd.Close()
if a.expectedSha256 != "" && sha256Hash != a.expectedSha256 {
return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256)
}

return nil
}

func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) {
Expand Down Expand Up @@ -271,7 +291,14 @@ func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest str

var artifacts []*pipepb.ArtifactInformation
var list []retrievable

hashes := extractArtifactHashes(ctx)

for _, md := range mds {
if trustedHash, ok := hashes[md.Name]; ok && trustedHash != "" {
md.Sha256 = trustedHash
}

typePayload, err := proto.Marshal(&pipepb.ArtifactFilePayload{
Path: md.Name,
Sha256: md.Sha256,
Expand Down Expand Up @@ -511,3 +538,37 @@ func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata {
}
return ret
}

type contextKey string

const pipelineOptionsKey contextKey = "pipeline_options"

// WithPipelineOptions returns a new context carrying the full pipeline options struct.
func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context {
return context.WithValue(ctx, pipelineOptionsKey, options)
}

// extractArtifactHashes gathers artifact hashes dictionary.
func extractArtifactHashes(ctx context.Context) map[string]string {
options, ok := ctx.Value(pipelineOptionsKey).(*structpb.Struct)
if !ok || options == nil {
return nil
}
pipelineOptions, ok := options.GetFields()["options"]
if !ok {
return nil
}
hashesOption, ok := pipelineOptions.GetStructValue().GetFields()["artifactHashes"]
if !ok {
return nil
}
hashesStruct := hashesOption.GetStructValue()
if hashesStruct == nil {
return nil
}
hashes := make(map[string]string)
for k, v := range hashesStruct.GetFields() {
hashes[k] = v.GetStringValue()
}
return hashes
}
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
log.Info(ctx, "Dry-run: not submitting job!")

log.Info(ctx, model.String())
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL)
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL, nil)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker

// (3) Translate to v1b3 and submit

job, err := Translate(ctx, raw, opts, workerURL, modelURL)
hashes := map[string]string{
"worker": hash,
}

job, err := Translate(ctx, raw, opts, workerURL, modelURL, hashes)
if err != nil {
return presult, err
}
Expand Down
21 changes: 12 additions & 9 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func containerImages(p *pipepb.Pipeline) ([]*df.SdkHarnessContainerImage, []stri
}

// Translate translates a pipeline to a Dataflow job.
func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL string) (*df.Job, error) {
func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL string, hashes map[string]string) (*df.Job, error) {
// (1) Translate pipeline to v1b3 speak.

jobType := "JOB_TYPE_BATCH"
Expand Down Expand Up @@ -155,6 +155,7 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
}

opts.Options.Options["experiments"] = strings.Join(opts.Experiments, ",")

job := &df.Job{
ProjectId: opts.Project,
Name: opts.Name,
Expand All @@ -176,10 +177,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
SdkPipelineOptions: newMsg(pipelineOptions{
DisplayData: printOptions(opts, images),
Options: dataflowOptions{
PipelineURL: modelURL,
Region: opts.Region,
Experiments: opts.Experiments,
TempLocation: opts.TempLocation,
PipelineURL: modelURL,
Region: opts.Region,
Experiments: opts.Experiments,
TempLocation: opts.TempLocation,
ArtifactHashes: hashes,
},
GoOptions: opts.Options,
}),
Expand Down Expand Up @@ -350,10 +352,11 @@ func GetMetrics(ctx context.Context, client *df.Service, project, region, jobID
// pipeline options that are communicated to cross-language SDK harnesses, so any pipeline options
// needed for cross-language transforms in Dataflow must be declared here.
type dataflowOptions struct {
Experiments []string `json:"experiments,omitempty"`
PipelineURL string `json:"pipelineUrl"`
Region string `json:"region"`
TempLocation string `json:"tempLocation"`
Experiments []string `json:"experiments,omitempty"`
PipelineURL string `json:"pipelineUrl"`
Region string `json:"region"`
TempLocation string `json:"tempLocation"`
ArtifactHashes map[string]string `json:"artifactHashes,omitempty"`
}

func printOptions(opts *JobOptions, images []string) []*displayData {
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func main() {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
}

// Inject full pipeline options into context
ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions())

// (2) Retrieve the staged user jars. We ignore any disk limit,
// because the staged jars are mandatory.

Expand Down
15 changes: 11 additions & 4 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def __init__(
options,
environment_version,
proto_pipeline_staged_url,
proto_pipeline=None):
proto_pipeline=None,
hashes=None):
self.standard_options = options.view_as(StandardOptions)
self.google_cloud_options = options.view_as(GoogleCloudOptions)
self.worker_options = options.view_as(WorkerOptions)
Expand Down Expand Up @@ -276,6 +277,9 @@ def __init__(
k: v
for k, v in sdk_pipeline_options.items() if v is not None
}
if hashes:
import json
options_dict["artifactHashes"] = hashes
options_dict["pipelineUrl"] = proto_pipeline_staged_url
# Don't pass impersonate_service_account through to the harness.
# Though impersonation should start a job, the workers should
Expand Down Expand Up @@ -577,6 +581,7 @@ def _stage_resources(self, pipeline, options):
raise RuntimeError('The --temp_location option must be specified.')

resources = []
hashes = {}
staged_paths = {}
staged_hashes = {}
for _, env in sorted(pipeline.components.environments.items(),
Expand Down Expand Up @@ -628,6 +633,7 @@ def _stage_resources(self, pipeline, options):
else:
resources.append(
(type_payload.path, remote_name, type_payload.sha256))
hashes[remote_name] = type_payload.sha256
staged_paths[type_payload.path] = remote_name
staged_hashes[type_payload.sha256] = remote_name

Expand All @@ -647,7 +653,7 @@ def _stage_resources(self, pipeline, options):
resource_stager = _LegacyDataflowStager(self)
staged_resources = resource_stager.stage_job_resources(
resources, staging_location=google_cloud_options.staging_location)
return staged_resources
return staged_resources, hashes

def stage_file(
self,
Expand Down Expand Up @@ -826,7 +832,7 @@ def create_job_description(self, job):
job.proto_pipeline, self._sdk_image_overrides, job.options)

# Stage other resources for the SDK harness
resources = self._stage_resources(job.proto_pipeline, job.options)
resources, hashes = self._stage_resources(job.proto_pipeline, job.options)

# Stage proto pipeline.
self.stage_file_with_retry(
Expand All @@ -841,7 +847,8 @@ def create_job_description(self, job):
packages=resources,
options=job.options,
environment_version=self.environment_version,
proto_pipeline=job.proto_pipeline).proto
proto_pipeline=job.proto_pipeline,
hashes=hashes).proto
_LOGGER.debug('JOB: %s', job)

@retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func launchSDKProcess() error {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err)
}

// Inject full pipeline options into context
ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions())

experiments := getExperiments(options)
pipNoBuildIsolation = false
if slices.Contains(experiments, "pip_no_build_isolation") {
Expand Down
Loading