Skip to content

Reuse all later tasks to keep the DRY principle? #40

@DOH-Manada

Description

@DOH-Manada

I have a few tasks of the following nature, which are pretty standard, i.e., Import processed dataset, setup input data, split, then pass to a model.

# _tasks.py

import d6tflow

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):
# loads a processed dataframe (probably pickled)

@d6tflow.requires(TaskLoadDataframe)
class TaskSetupExogEndogData(d6tflow.tasks.TaskCache):
# do stuff. Saves data and labels

@d6tflow.requires({'inputs' : TaskSetupExogEndogData, })
class TaskSplitData(d6tflow.tasks.TaskCache):
# do more stuff. Splits data and labels and saves to dictionary
# _tasks_sklearn.py

import _tasks

import d6tflow
from sklearn import svm

@d6tflow.requires(_tasks.TaskSplitData)
class TaskTrainSklearnSVM(d6tflow.tasks.TaskCache):
    kernel = d6tflow.Parameter(default = 'linear')
    
    def run(self):
        data = self.inputLoad()
        model_svm = svm.SVR(kernel = self.kernel)
        model_svm.fit(data['train_data'], data['train_labels'])
        model_svm.score(data['valid_data'], data['valid_labels'])
        # TODO: self.save model artifacts

Context: I would obviously want to reuse this as much as possible.

Question 1: Is it possible to create several independent tasks for processing dataset that I can set as the "initial task" of this workflow?
Question 2: If yes, wow would I call that as a dynamic requirement in TaskLoadDataframe?

image

Solution A:
It seems the best way to handle this within the scope of this package is to not create a TaskA and just do the following:

  1. Preprocess the dataframe
  2. Export to a pickle (or csv)
  3. Read the path to the exported file in as a parameter to the TaskLoadDataframe so I could run the WorkFlow, and continue on.

Solution B:
I know Luigi doesn't allow for passing dataframes as parameters, but could I call a dataframe in the run of a task as a means of reducing/completely removing the fileio in step 2?

class TaskLoadDataframe(d6tflow.tasks.TaskCachePandas):

    def run(self, dataframe):
        self.save(dataframe)

I don't think the source code allows for this, what would the syntax to run that as a workfow?

Solution B (reprise): I also could alternatively save the processed dataframe in a dictionary and pass it into TaskLoadDataframe as a d6tflow-defined parameter.

Thoughts? Great work on this by the way.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions