This repository has been archived by the owner on Feb 20, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
SparkStructuredStateModelServer.scala
158 lines (136 loc) · 6.32 KB
/
SparkStructuredStateModelServer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*
* Copyright (C) 2017-2019 Lightbend
*
* This file is part of the Lightbend model-serving-tutorial (https://github.com/lightbend/model-serving-tutorial)
*
* The model-serving-tutorial is free software: you can redistribute it and/or modify
* it under the terms of the Apache License Version 2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lightbend.modelserving.spark.server
import com.lightbend.model.winerecord.WineRecord
import com.lightbend.modelserving.configuration.ModelServingConfiguration
import com.lightbend.modelserving.model.{ModelToServe, ServingResult}
import com.lightbend.modelserving.spark.{DataWithModel, KafkaSupport, ModelState}
import com.lightbend.modelserving.winemodel.WineFactoryResolver
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection._
/**
* Implementation of Model serving using a Spark Structured Streaming server with near-real time support,
* using Spark's "continuous processing" engine.
*/
object SparkStructuredStateModelServer {
import ModelServingConfiguration._
def main(args: Array[String]): Unit = {
println(s"Running Spark Model Server. Kafka: $KAFKA_BROKER ")
// Create context
val sparkSession = SparkSession.builder
.appName("SparkModelServer")
.master("local[3]") // TODO: In production code, don't hard code a value here
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "com.lightbend.modelserving.spark.ModelStateRegistrator")
.config("spark.sql.streaming.checkpointLocation", CHECKPOINT_DIR)
.getOrCreate()
sparkSession.sparkContext.setLogLevel("ERROR")
import sparkSession.implicits._
// Set modelToServe
ModelToServe.setResolver(WineFactoryResolver)
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(1))
ssc.checkpoint("./cpt")
// Message parsing
sparkSession.udf.register("deserializeData", (data: Array[Byte]) => DataWithModel.dataWineFromByteArrayStructured(data))
// Current state of data models
val currentModels = mutable.Map[String, ModelState]()
// Create data stream
val datastream = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("subscribe", DATA_TOPIC)
.option(ConsumerConfig.GROUP_ID_CONFIG, DATA_GROUP)
.option(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("""deserializeData(value) AS data""")
.select("data.fixedAcidity", "data.volatileAcidity", "data.citricAcid", "data.residualSugar",
"data.chlorides", "data.freeSulfurDioxide", "data.totalSulfurDioxide", "data.density", "data.pH",
"data.sulphates", "data.alcohol", "data.dataType", "data.ts"
)
.as[WineRecord]
.map(data => {
data.dataType match {
case dtype if(dtype != "") =>
currentModels.get (data.dataType) match {
case Some (state) =>
val result = state.model.score (data)
ServingResult[Double] (state.name, data.dataType, System.currentTimeMillis () - data.ts, result)
case _ => ServingResult[Double] ("No model available","",0,.0)
}
case _ => ServingResult[Double] ("Bad input record", "",0,.0)
}
}).as[ServingResult[Double]]
var dataQuery = datastream
.writeStream.outputMode("update")
.format("console").option("truncate", false).option("numRows", 10) // 10 is the default
.trigger(Trigger.Continuous("5 second"))
.start
// Create models kafka stream
val kafkaParams = KafkaSupport.getKafkaConsumerConfig(KAFKA_BROKER)
val modelsStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](ssc,PreferConsistent,
Subscribe[Array[Byte], Array[Byte]](Set(MODELS_TOPIC),kafkaParams))
// Here we run a model loop as Spark streaming, so that we can get more control
modelsStream.foreachRDD( rdd =>
if (!rdd.isEmpty()) {
val models = rdd.map(_.value).collect
.map(ModelToServe.fromByteArray(_)).filter(_.isSuccess).map(_.get)
val newModels = models.map(modelToServe => {
println (s"New model ${modelToServe}")
// Update state with the new model
val model = WineFactoryResolver.getFactory(modelToServe.modelType) match {
case Some (factory) => factory.create(modelToServe)
case _ => None
}
model match {
case Some (m) => (modelToServe.dataType, ModelState (modelToServe.name, m) )
case _ => (null, null)
}
}).toMap
// Merge maps. This will create a new model's map used by the streaming query
newModels.foreach{ case (name, value) => {
if(currentModels.contains(name))
currentModels(name).model.cleanup()
currentModels(name) = value
}}
// Stop the currently running Spark structured query, so that we can modify
// Model's, that are stored in the driver. When we restart below, the new models will be
// serialized to the new tasks created for the restarted job.
if(dataQuery.isActive) {
println("Stopping data query")
dataQuery.stop
}
// Restart the streaming query with new models map.
println("Starting data query")
dataQuery = datastream
.writeStream.outputMode("update")
.format("console").option("truncate", false).option("numRows", 10) // 10 is the default
.trigger(Trigger.Continuous("5 second"))
.start
}
)
// Execute
ssc.start()
ssc.awaitTermination()
}
}