Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exec] Fixed multiple event inputs #187

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

gfalcone
Copy link
Contributor

@gfalcone gfalcone commented Apr 3, 2021

Hello !

I have an error when I use multiple event inputs like this in my Klio job :

job_config:
  allow_non_klio_messages: True
  events:
    inputs:
      - type: pubsub
        topic: projects/my-project/topics/oplog-high-latency
        subscription: projects/my-project/subscriptions/frames-input
      - type: pubsub
        topic: projects/my-project/topics/oplog-high-latency
        subscription: projects/my-project/subscriptions/frames-input

Here is the error I get when I do a klio job run:

Traceback (most recent call last):
  File "/usr/local/bin/klioexec", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/klio_core/utils.py", line 238, in wrapper
    func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/klio_exec/cli.py", line 95, in run_pipeline
    klio_pipeline.run()
  File "/usr/local/lib/python3.6/site-packages/klio_exec/commands/run.py", line 556, in run
    self._setup_pipeline(pipeline)
  File "/usr/local/lib/python3.6/site-packages/klio_exec/commands/run.py", line 529, in _setup_pipeline
    pipeline
  File "/usr/local/lib/python3.6/site-packages/klio_exec/commands/run.py", line 488, in _generate_pcoll_per_input
    | "Merge multi-input pass-thrus" >> beam.Flatten()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 1036, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/ptransform.py", line 553, in __ror__
    'as there are no deferred inputs.' % self.label)
ValueError: "Flatten" requires a pipeline to be specified as there are no deferred inputs.

I have successfully ran my jobs with this fix.

Thank you in advance for taking a look into it :)

Checklist for PR author(s)

  • Format the pull request title like [cli] Fixes bugs in 'klio job fake-cmd'.
  • Changes are covered by unit tests (no major decrease in code coverage %) and/or integration tests.
  • Document any relevant additions/changes in the appropriate spot in docs/src.
  • For any change that affects users, update the package's changelog in docs/src/reference/<package>/changelog.rst.

@gfalcone gfalcone force-pushed the fix_multiple_event_inputs branch 2 times, most recently from 7c2d898 to 25ff120 Compare April 3, 2021 11:51
@fallonchen
Copy link
Contributor

Hi @gfalcone ! Thanks for taking the time to make this PR, and sorry for the long delay in replying!

While the proposed change makes sense to me, I am having trouble reproducing this error - can you tell me how you are using the inputs in your run.py, as well as: version of klio, Python and Beam you are using?

@gfalcone
Copy link
Contributor Author

Hello @fallonchen no problem :).

It happened on Python 3.6 and Apache Beam 2.27 (which is not the version used at the moment by Klio if I am not wrong).

If I recall, I was following the example on this page : https://docs.klio.io/en/stable/userguide/pipeline/io.html#step-2-using-multiple-inputs

@gfalcone
Copy link
Contributor Author

Hello @fallonchen, do you have any news concerning this PR ? Thank you :)

@econchick
Copy link
Contributor

Hey @gfalcone - sorry for the silence!

I'm going to close then reopen this PR to re-trigger the GitHub actions. If this still passes, I'll go ahead and merge, and it should be available in either the 21.11.0 or 21.12.0 release (we may skip 21.11.0).

@econchick econchick closed this Nov 19, 2021
@econchick econchick reopened this Nov 19, 2021
@econchick
Copy link
Contributor

fixed a separate bug that had the build broken, so let's try this re-triggering workflows again 😅

@econchick econchick closed this Nov 19, 2021
@econchick econchick reopened this Nov 19, 2021
@econchick
Copy link
Contributor

uff, okay - hey @gfalcone could I ask you to rebase this PR off of latest develop branch and push? if that's too much, I can close this PR and create a fresh one with your change in it (I'd otherwise rebase & push to this PR myself but I don't have access to your fork). I'll close if I don't hear from you after a week or so.

@gfalcone
Copy link
Contributor Author

gfalcone commented Jan 3, 2022

Hello @econchick , first of all sorry for the late reply :/. I rebased my branch with develop, it should be good now. Thank you for your help on this :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants