Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative way of defining Graph Stages #121

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jorkey
Copy link

@jorkey jorkey commented Jan 24, 2018

Preconditions.

Generally, the processing of the received element from the inlet or the event from the timer results in the writing of the resulting elements to one or more outlets. The event from the inlet or timer should not be processed until every outlet into which the writing can be made, became ready to be written, or its buffer will allow the acceptance of new elements.

Using of low-level pull/grab/push functions to control back-pressure is fraught with errors and complicates the application logic. Also, Akka Streams does not provide today the possibility of specifying a buffer with OverflowStrategy for the outlet in GraphStage with arbitrary Shape.

Decision.

The idea is to describe the relationships between inlets, events from the timer and outlets. For each link, the buffer for outgoing elements and OverflowStrategy can be specified. An element from the inlet or event from the timer enters the processing only when the outlet is available for writing for each link, or OverflowStrategy for the buffer allows you to receive at least one element. If there are no connections for the incoming port or timer, the element from the inlet or event from the timer will be processed immediately.

If the outlet is linked to several inlets, items for writing to the outlet are accumulated in different buffers belonging to different links. When the outlet becomes available for writing, the elements from the buffers will be written in the same order as they were pushed.

Links can be added/deleted at any time. Deleting a link with the outlet does not delete the buffer until it is freed.

Sometimes the business logic of an application requires more than one item to be written to the outlet at same time. In this case, the element is placed to the buffer. This is allowed, but the application should not abuse this possibility.

LinkedLogics class implements the specified logic. LinkedLogics inherits GraphStageLogic, but it is a final class, and can not be inherited by the application. Low-level pull/grab/push functions with access "protected" are not visible from the outside, which completely excludes their use by the application. LinkedLogics represents a new level of management of the back pressure policy within GraphStage.

Linking.

For each inlet in the shape, InputLogic object must be created and added to the LinkedLogics.
The links with the outlets and the handler of the received elements are defined inside the InputLogic.

override val shape = new BidiShape(Inlet[String]("In1"), Outlet[String]("Out1"), Inlet[String]("In2"), Outlet[String]("Out2"))

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
  val logics = new LinkedLogics(shape)

  // Create input processing logic from in1
  val logic1 = new InputLogic(shape.in1) {
    // Create link with out1. If out1 is not available to write, in1 will be back-pressured.
    val out1 = linkOutput(shape.out1)
 
    // The element from in1 will be read and processed when out1 becomes available to write.
    override def inputHandler(packet: String): Unit = {
      out1.push(«from 1 » + packet)
    }
  }
  logics.add(logic1)
  // Create input processing logic from shape.in2
  val logic2 = new InputLogic(shape.in2) {
    // Create link with out1. If out1 is not available to write, in2 will be back pressured.
    val out1 = linkOutput(shape.out1)
    // Create a link to out2 with 100-element buffer. When the buffer is full, in2 will be back pressured.
    val out2 = linkOutput(shape.out2, 100, OverflowStrategy.backpressure) 

    // An element from in2 will be read and processed when out1 becomes available for writing and a place appears in the out1 buffer.
    override def inputHandler(packet: String): Unit = {
      out1.push(«from 2 » + packet)
      out2.push(packet)
    }
  }
  logics.add(logic2)

Once LinkedLogic is started, new links can also be added.
When the link becomes unnecessary, it can be deleted by calling LogicLink.remove()

Using timers.

TimerLinkedLogics except linking inlets and outlets allows to create timers and associate them with outlets. TimerLinkedLogics inherits TimerGraphStageLogic and is the final class.

override val shape = new BidiShape(Inlet[String]("In1"), Outlet[String]("Out1"), Inlet[String]("In2"), Outlet[String]("Out2"))

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
  val logics = new TimerLinkedLogics(shape)
To start once scheduled timer, create and add to the OnceScheduledTimerLogic TimerLinkedLogics.

  val timerLogic = logics.add(new OnceScheduledTimerLogic(period) {
    val out1 = linkOutput(shape.out1)
    override def timerHandler(): Unit = {
      out1.push(«Timer fired»)
    }
  })

To start a timer that periodically runs through a time interval, PeriodicallyTimerLogic is created and added to the TimerLinkedLogics.
The PeriodicallyTimerLogic constructor can specify an initial delay and a time interval.

Just like in the case of InputLogic, you can add and remove links in the process.
The call to TimerLogic .cancel() terminates the timer and removes TimerLogic from TimerLinkedLogics.

Manual control of back pressure.

Application logic may require you to set back pressure on the inlet regardless of the availability of outlets. A special Stopper link creates an eternal back pressure on the incoming port:

var stopper = logic.linkStopper()

To open the input, we need to remove Stopper:

stopper.remove()

@jorkey jorkey changed the title Add LinkedLogics Alternative way of defining Graph Stages Feb 8, 2018
@patriknw
Copy link
Member

Interesting. I wonder if we could come up with a high-level api that is easy to understand for the specific case of stateful Flow shape (one in, one out), and supporting materialized values, async callbacks and timers. I think that case is very common. Based on such experiment we could see if similar could be defined for other common shapes, such Source.

@jorkey
Copy link
Author

jorkey commented Feb 19, 2018

I'm glad that this idea interested you.
Defining LinkedLogics with common shapes is easy.
For example, that is simple «grep» filter:

val shape = new FlowShape(Inlet[String]("In"), Outlet[String]("Out"))

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
  val logics = new LinkedLogics(shape)
  logics.add(new InputLogic(shape.in) {
    val out = linkOutput(shape.out)

    override def inputHandler(packet: String): Unit = {
      if (packet.contains(pattern)) {
        out.push(packet)
      }
    }
  })
  logics
}

Today LinkedLogics does not support async callbacks. It would be nice to add them too.
Because async callbacks do not allow to pause the input, it seems possible to create only buffer links with no OverflowStrategy.backpressure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants