Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallelisation of evaluations #481

Open
PierreGtch opened this issue Sep 12, 2023 · 3 comments
Open

Improve parallelisation of evaluations #481

PierreGtch opened this issue Sep 12, 2023 · 3 comments

Comments

@PierreGtch
Copy link
Collaborator

After discussions at the braindecode code sprint and following up on #460, I think we should break down the evaluations into something like that:

class BaseEvaluation:
    def __init__(
            self,
            ...
            n_nodes=1, # number of data chunks to load in memory in parallel.
            n_jobs=1,  # number of jobs per data chunk. One job fits one pipeline on one fold.
    ):
        self.n_nodes = n_nodes
        self.n_jobs = n_jobs

    @abc.abstractmethod
    def get_splits(self) -> list[dict, list[dict, list[int], list[int]]]:
        """
        Return a list of pairs with:
          * a dict of arguments to pass to self.paradigm.get_data to load a minimal data chunk
          * a list of splits for this data chunk, i.e. triplets with:
            - dict describing the split,
            - list of train indices,
            - list of test indices.
        """
        pass

    def process(self, pipelines):
        splits = self.get_splits()
        splits_todo = []
        for datachunk_args, chunk_splits in splits:
            missing_results = self.results.not_yet_computed(datachunk_args, chunk_splits, pipelines)
            if missing_results:
                splits_todo.append((datachunk_args, chunk_splits, missing_results))
        Parallel(n_jobs=self.n_nodes)(delayed(self.process_datachunk)(pipelines, *args) for args in splits_todo)
        return self.results.to_dataframe(pipelines=pipelines, ...)

    def process_datachunk(self, pipelines, datachunk_args, chunk_splits, missing_results):
        X, y, metadata = self.paradigm.get_data(**datachunk_args)
        Parallel(n_jobs=self.n_jobs)(delayed(self.process_split)(p, X, y, metadata, *split) for split in chunk_splits for p in pipelines)
    
    def process_split(self, clf, X, y, metadata, split_args, train_idx, test_idx):
        clf = deepcopy(clf)
        clf.fit(X[train_idx], y[train_idx])
        score = clf.score(X[test_idx], y[test_idx])
        self.results.add(datachunk_args, split_args, clf, score)

This would remove all the for loops we have in the different evaluations and allow for larger parallelisation.

@bruAristimunha
Copy link
Collaborator

Ping @tomMoral, to join the conversation

@tomMoral
Copy link
Collaborator

The proposed pattern couples the code that perform the evaluation (run the code + parallelization) from the process that decide the split. I would recommend to further decouple them, in light of what scikit-learn does, so that the API is similar, making it easy to grasp the various concepts.

Basically, the get_split is serving the same functionality as the BaseCrossValidator object in scikit-learn.
The API works with three methods:

  • __init__: this setup the parameters of the split if any.
  • get_n_split: this method would take a dataset and returns the number of splits (for instance with leave one subject out, the number of subjects).
  • split: This method is a generator, which takes the dataset as input and when iterated on, gives the train_idx, test_idx.

Taking back the Evaluation object, you would have a single one I guess, such that:

memory = joblib.Memory(location="__cache__")

class Evaluation:
    def __init__(
            self,
            ...
            n_nodes=1, # number of data chunks to load in memory in parallel.
            n_jobs=1,  # number of jobs per data chunk. One job fits one pipeline on one fold.
            cv="intersubject",
      
    ):
        self.n_nodes = n_nodes
        self.n_jobs = n_jobs
        if isinstance(cv, str): # make it easy if you want default parameters for cv
            cv = CV_CLASSES[cv]()
        self.cv= cv

    def process(self, pipelines, datasets):
        results = Parallel(n_jobs=self.n_jobs)(
            delayed(self.process_split)(p, d, metadata, train_idx, test_idx)
            for p in pipelines for d in datasets
            for (train_idx, test_idx) in self.cv.split(d)
        )
        return pd.DataFrame(results)
    
    @memory.cache
    def process_split(self, clf, dataset, metadata, split_args, train_idx, test_idx):
        clf = deepcopy(clf)
        X_train, X_test, y_train, y_test, metadata = self.paradigm.get_data(
            **datachunk_args, train_idx, test_idx
        )
        clf.fit(X_train, y_train)
        score = clf.score(X_test, y_test)
        return {'metadata': datachunk_args, 'clf': clf, 'score': score}

Note that I changed the manual caching to use joblib.Memory which is done for caching calls a a function and I flattened the parallelism (joblib is bad with nested parallelism).

@PierreGtch
Copy link
Collaborator Author

Thanks @tomMoral for your feedback!! But not sure if this would completely work because have some quite specific constraints:

  • One of the most expensive steps is the call to paradigm.get_data because it loads from disk and pre-processes the data, so we would like to call it only once for all the splits and pipelines. Do you think this could be achieved through joblib.Memory?
  • Additionally, some datasets are quite large (more than 50GB), so we need to be able to:
    • only load the minimal amount of data, i.e. one subject (except for cross-subject case),
    • do all the evaluations on it,
    • free the memory,
    • load the next minimal data chunk...
  • Finally, I don't think we can define test_idx and train_idx before loading the data because the only info we have about the datasets is the number of subjects they contain. We don't know the number of sessions or the number of examples per session before loading the data. Maybe we should try to change that? @bruAristimunha @sylvchev

This is why I proposed this nested parallelism. Maybe an in-between would be to implement BaseCrossValidators but that would receive only the data of one subject as input instead of a whole dataset?

@bruAristimunha bruAristimunha linked a pull request Sep 20, 2023 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants