Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix csv source for the alpakka-sample-http-csv-to-kafka #68

Open
giorgiopers opened this issue Nov 20, 2020 · 5 comments
Open

Fix csv source for the alpakka-sample-http-csv-to-kafka #68

giorgiopers opened this issue Nov 20, 2020 · 5 comments

Comments

@giorgiopers
Copy link

The nasdaq changed the way they share data with the outside world.
Just to see it working I've used another open source csv

val uriCsv = "https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2019-financial-year-provisional/Download-data/annual-enterprise-survey-2019-financial-year-provisional-csv.csv"

It's probably not the best link to use but currently the project just doesn't work because of a missing source.

@giorgiopers
Copy link
Author

Also having a look at why publishing flow just hang because of the error.

@ennru
Copy link
Member

ennru commented Nov 20, 2020

Thank you for suggesting a new data source for this example.

@PKRoma
Copy link

PKRoma commented Aug 8, 2021

I have found that the Nasdaq URI can be replaced with https://api.nasdaq.com/api/screener/stocks?tableonly=true&limit=25&exchange=NASDAQ&download=true and it works.

However, this new Nasdaq URI only responds to http2 requests, and so the example code must be changed in order to support http2. This is no mean feat and isn't as simple as making a change here or there.

It would be great if the examples were updated to use http2.

@PKRoma
Copy link

PKRoma commented Aug 8, 2021

For example, step_001_http_request can be changed to this code, but it's only a hack to make it work:

/*
 * Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
 */

package samples

import akka.Done
import akka.actor._
import akka.http.scaladsl._
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges, ResponsePromise }
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Flow, Sink, Source }

import scala.concurrent.{ Future, Promise }

object Main extends App {

  implicit val actorSystem = ActorSystem("alpakka-samples")

  import actorSystem.dispatcher

  val httpRequest = HttpRequest(uri = "https://api.nasdaq.com/api/screener/stocks?tableonly=true&limit=25&exchange=NASDAQ&download=true")
    .withHeaders(Accept(MediaRanges.`text/*`))

  val future: Future[Done] =
    Source
      .single(httpRequest) //: HttpRequest
      .mapAsync(1)(singleRequest(Http().connectionTo("api.nasdaq.com").http2())(_)) //: HttpResponse
      .runWith(Sink.foreach(println))

  future.map { _ =>
    println("Done!")
    actorSystem.terminate()
  }

  // Boilerplate to deal with http2 from https://doc.akka.io/docs/akka-http/current/client-side/http2.html
  def singleRequest(connection: Flow[HttpRequest, HttpResponse, Any], bufferSize: Int = 100): HttpRequest => Future[HttpResponse] = {
    val queue =
      Source.queue(bufferSize, OverflowStrategy.dropNew)
        .via(connection)
        .to(Sink.foreach { response =>
          // complete the response promise with the response when it arrives
          val responseAssociation = response.attribute(ResponsePromise.Key).get
          responseAssociation.promise.trySuccess(response)
        })
        .run()

    req => {
      // create a promise of the response for each request and set it as an attribute on the request
      val p = Promise[HttpResponse]()
      queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p)))
        // return the future response
        .flatMap(_ => p.future)
    }
  }

}

@ennru
Copy link
Member

ennru commented Aug 19, 2021

Thank you for digging up the new URL. Would you be in a position to suggest these changes in a PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants