Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fix issue #285 : save hive partitioned dataset using NativeExecutionEngine and DaskExecutionEngine #306

Merged
merged 18 commits into from
Apr 3, 2022

Conversation

LaurentErreca
Copy link
Contributor

@LaurentErreca LaurentErreca commented Feb 28, 2022

Enable saving hive partitioned dataset using Native (pandas) or Dask execution engine. Work still in progress as test_take with Dask is failing.
Related to #285

@kvnkho
Copy link
Collaborator

kvnkho commented Mar 3, 2022

I looked at the tests here and from what I see, the only failure is in the duckdb execution engine for the io tests that you added. Below is a log of the test. The test_take tests seem to be fine.

self = <tests.fugue_duckdb.test_execution_engine.DuckBuiltInTests testMethod=test_io>

    def test_io(self):
        path = os.path.join(self.tmpdir, "a")
        path2 = os.path.join(self.tmpdir, "b.test.csv")
        path3 = os.path.join(self.tmpdir, "c.partition")
        with self.dag() as dag:
            b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
            b.partition(num=3).save(path, fmt="parquet", single=True)
            b.save(path2, header=True)
        assert FileSystem().isfile(path)
        with self.dag() as dag:
            a = dag.load(path, fmt="parquet", columns=["a", "c"])
            a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int"))
            a = dag.load(path2, header=True, columns="c:int,a:long")
            a.assert_eq(dag.df([[6, 1], [2, 7]], "c:int,a:long"))
        with self.dag() as dag:
            b = dag.df([[6, 1], [2, 7]], "c:int,a:long")
            b.partition(by='c').save(path3, fmt="parquet", single=False)
            b.save(path2, header=True)
>       assert FileSystem().isdir(path3)
E       AssertionError

So the issue here appears to be that the added tests support partitioned files as output (which is what the PR is for), but duckdb fails the test because it doesn't support partitioning yet.

How do we proceed with this @goodwanghan ?

# TODO: in test below, once issue #288 is fixed, use dag.load instead of pd.read_parquet
pd.testing.assert_frame_equal(
pd.read_parquet(path3).sort_values('a').reset_index(drop=True),
pd.DataFrame({'c': pd.Categorical([6, 2]), 'a': [1, 7]}).reset_index(drop=True),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert c is int type?

@goodwanghan
Copy link
Collaborator

The failure on duckdb test is expected because duckdb itself does not have partitioning feature. But it is quite straightforward to convert duckdbdataframe to arrow table (as_arrow), and then save to parquet with partition, so we should also make that change, then this looks super nice.

@goodwanghan
Copy link
Collaborator

I apologize for the delay, my daughter was born in Feb, so I am extremely busy recently.

The PR looks great, we just need to change duckdb io function to pass the test suite.

You can see we have other work items to redesign the IO part for all engines. Because the current design is a bit messy. But please continue working on this PR and merge first. The code refactoring can happen later. As long as we have good unit tests, we can ship this feature first.

Thank you!

@LaurentErreca
Copy link
Contributor Author

LaurentErreca commented Mar 4, 2022 via email

@goodwanghan
Copy link
Collaborator

@LaurentErreca do you plan to finalize this PR soon?

@LaurentErreca
Copy link
Contributor Author

LaurentErreca commented Mar 28, 2022 via email

@LaurentErreca
Copy link
Contributor Author

I've been working on this issue. For duckdb to pass the test, it must be able to write hive partitioned dataset. I propose to use arrow with param partition_cols instead of the duckdb write function which uses a COPY TO function under the hood. This function doesn't support hive partitioned write.

@goodwanghan
Copy link
Collaborator

I've been working on this issue. For duckdb to pass the test, it must be able to write hive partitioned dataset. I propose to use arrow with param partition_cols instead of the duckdb write function which uses a COPY TO function under the hood. This function doesn't support hive partitioned write.

Yes exactly, we should just convert it to arrow and save (only if this partition key is specified). It is straightforward to convert duckdbdataframe to arrow table (as_arrow), and then save to parquet with partition.

@codecov-commenter
Copy link

codecov-commenter commented Apr 3, 2022

Codecov Report

Merging #306 (b3e86b5) into master (281fe2a) will not change coverage.
The diff coverage is 100.00%.

@@            Coverage Diff            @@
##            master      #306   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files          101       102    +1     
  Lines         9480      9511   +31     
=========================================
+ Hits          9480      9511   +31     
Impacted Files Coverage Δ
fugue/_utils/interfaceless.py 100.00% <100.00%> (ø)
fugue/_utils/register.py 100.00% <100.00%> (ø)
fugue/execution/__init__.py 100.00% <100.00%> (ø)
fugue/execution/factory.py 100.00% <100.00%> (ø)
fugue/execution/native_execution_engine.py 100.00% <100.00%> (ø)
fugue/extensions/_utils.py 100.00% <100.00%> (ø)
fugue/workflow/utils.py 100.00% <100.00%> (ø)
fugue_dask/execution_engine.py 100.00% <100.00%> (ø)
fugue_duckdb/_io.py 100.00% <100.00%> (ø)
fugue_duckdb/execution_engine.py 100.00% <100.00%> (ø)
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 281fe2a...b3e86b5. Read the comment docs.

@goodwanghan
Copy link
Collaborator

It looks good you only have a linting issue left @LaurentErreca

@LaurentErreca
Copy link
Contributor Author

I've been working on this issue. For duckdb to pass the test, it must be able to write hive partitioned dataset. I propose to use arrow with param partition_cols instead of the duckdb write function which uses a COPY TO function under the hood. This function doesn't support hive partitioned write.

Yes exactly, we should just convert it to arrow and save (only if this partition key is specified). It is straightforward to convert duckdbdataframe to arrow table (as_arrow), and then save to parquet with partition.

This is done by replacing:
if p.file_format not in self._format_save: self._fs.makedirs(os.path.dirname(uri), recreate=True) ldf = ArrowDataFrame(df.native.arrow())
with:
if (p.file_format not in self._format_save) or ("partition_cols" in kwargs): self._fs.makedirs(os.path.dirname(uri), recreate=True) ldf = ArrowDataFrame(df.native.arrow())

@goodwanghan
Copy link
Collaborator

I've been working on this issue. For duckdb to pass the test, it must be able to write hive partitioned dataset. I propose to use arrow with param partition_cols instead of the duckdb write function which uses a COPY TO function under the hood. This function doesn't support hive partitioned write.

Yes exactly, we should just convert it to arrow and save (only if this partition key is specified). It is straightforward to convert duckdbdataframe to arrow table (as_arrow), and then save to parquet with partition.

This is done by replacing: if p.file_format not in self._format_save: self._fs.makedirs(os.path.dirname(uri), recreate=True) ldf = ArrowDataFrame(df.native.arrow()) with: if (p.file_format not in self._format_save) or ("partition_cols" in kwargs): self._fs.makedirs(os.path.dirname(uri), recreate=True) ldf = ArrowDataFrame(df.native.arrow())

I think you should use df.as_arrow() instead of df.native.arrow() because as_arrow is working on any dataframe. df.native assumes it is a duckdb relation, sometimes it is not.

@@ -70,7 +70,7 @@ def save_df(
NotImplementedError(f"{mode} is not supported"),
)
p = FileParser(uri, format_hint).assert_no_glob()
if p.file_format not in self._format_save:
if (p.file_format not in self._format_save) or ("partition_cols" in kwargs):
self._fs.makedirs(os.path.dirname(uri), recreate=True)
ldf = ArrowDataFrame(df.native.arrow())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use ArrowDataFrame(df.as_arrow()) instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants