Skip to content

Commit

Permalink
Merge pull request #280 from slicebox/bugfix/#279-import-session-bugs
Browse files Browse the repository at this point in the history
Bugfix/#279 import session bugs
  • Loading branch information
KarlSjostrand committed May 9, 2018
2 parents 4fc7cbf + 974f474 commit 32b17b2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 52 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Expand Up @@ -85,7 +85,6 @@ libraryDependencies ++= {
"com.typesafe.slick" %% "slick-hikaricp" % slickVersion,
"com.h2database" % "h2" % "1.4.197",
"mysql" % "mysql-connector-java" % "6.0.6",
"com.zaxxer" % "HikariCP" % "2.7.8",
"com.github.t3hnar" %% "scala-bcrypt" % "3.1",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.312",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
Expand All @@ -99,7 +98,7 @@ libraryDependencies ++= {
"org.webjars" % "angularjs" % "1.5.11",
"org.webjars" % "angular-material" % "1.1.5",
"org.webjars" % "angular-file-upload" % "11.0.0",
"se.nimsa" %% "dcm4che-streams" % "0.6" exclude("org.slf4j", "slf4j-simple"),
"se.nimsa" %% "dcm4che-streams" % "0.7" exclude("org.slf4j", "slf4j-simple"),
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion,
"com.lightbend.akka" %% "akka-stream-alpakka-file" % alpakkaVersion
)
Expand Down
20 changes: 12 additions & 8 deletions src/main/assets/js/import.js
Expand Up @@ -74,43 +74,47 @@ angular.module('slicebox.import', ['ngRoute', 'ngFileUpload'])
return fileOrDirectory.type !== 'directory';
});

var nConcurrent = Math.min(10, files.length);
var nUploading = 0;

$scope.uiState.currentFileSet.processing = true;
$scope.uiState.currentFileSet.index = 0;
$scope.uiState.currentFileSet.total = files.length;
$scope.uiState.currentFileSet.progress = 0;

var prepareNext = function() {
files.shift();
if (files.length) {
next();
} else if ($scope.uiState.currentFileSet.processing) {
next(files.shift());
} else if (nUploading <= 0) {
$scope.uiState.currentFileSet.processing = false;
$scope.callbacks.importSessionsTable.reloadPage();
}
};

var next = function() {
var next = function(file) {
$scope.uiState.currentFileSet.index++;
$scope.uiState.currentFileSet.progress = Math.round(100 * $scope.uiState.currentFileSet.index / $scope.uiState.currentFileSet.total);

nUploading += 1;

Upload.upload({
url: '/api/import/sessions/' + $scope.uiState.selectedSession.id + '/images',
data: {file: files[0]}
data: {file: file}
}).success(function () {
nUploading -= 1;
prepareNext();
}).error(function (message, status) {
if (status >= 300 && status !== 400) {
sbxToast.showErrorMessage(message);
}
nUploading -= 1;
prepareNext();
});
};

// start nConcurrent uploads
var nConcurrent = Math.min(6, files.length);
for (var i = 0; i < nConcurrent; i++) {
next();
files.shift();
next(files.shift());
}
};

Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/se/nimsa/sbx/app/routing/ImportRoutes.scala
Expand Up @@ -16,13 +16,11 @@

package se.nimsa.sbx.app.routing

import akka.http.scaladsl.model.EntityStreamException
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.FileInfo
import akka.pattern.ask
import akka.stream.AbruptIOTerminationException
import akka.stream.scaladsl.{Source => StreamSource}
import akka.util.ByteString
import org.dcm4che3.io.DicomStreamException
Expand Down Expand Up @@ -124,12 +122,14 @@ trait ImportRoutes {
fileInfo match {
case Some(fi) =>
SbxLog.error("Import", s"${failure.getClass.getSimpleName} during import of ${fi.fileName}: ${failure.getMessage}")
importService.ask(UpdateSessionWithRejection(importSession))
complete((status, s"${fi.fileName}: ${failure.getMessage}"))
onComplete(importService.ask(UpdateSessionWithRejection(importSession.id))) {
_ => complete((status, s"${fi.fileName}: ${failure.getMessage}"))
}
case None =>
SbxLog.error("Import", s"${failure.getClass.getSimpleName} during import: failure.getMessage")
importService.ask(UpdateSessionWithRejection(importSession))
complete((status, failure.getMessage))
SbxLog.error("Import", s"${failure.getClass.getSimpleName} during import: ${failure.getMessage}")
onComplete(importService.ask(UpdateSessionWithRejection(importSession.id))) {
_ => complete((status, failure.getMessage))
}
}
}
case None =>
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/se/nimsa/sbx/importing/ImportProtocol.scala
Expand Up @@ -43,7 +43,7 @@ object ImportProtocol {

case class AddImageToSession(importSessionId: Long, image: Image, overwrite: Boolean) extends ImportSessionRequest

case class UpdateSessionWithRejection(importSession: ImportSession) extends ImportSessionRequest
case class UpdateSessionWithRejection(importSessionId: Long) extends ImportSessionRequest

case class GetImportSessions(startIndex: Long, count: Long) extends ImportSessionRequest

Expand Down
77 changes: 43 additions & 34 deletions src/main/scala/se/nimsa/sbx/importing/ImportServiceActor.scala
Expand Up @@ -16,81 +16,90 @@

package se.nimsa.sbx.importing

import akka.actor.Actor
import akka.event.Logging
import akka.actor.Props
import akka.event.LoggingReceive
import akka.util.Timeout
import akka.actor.{Actor, Props, Stash}
import akka.event.{Logging, LoggingReceive}
import akka.pattern.pipe
import se.nimsa.sbx.importing.ImportProtocol._
import se.nimsa.sbx.lang.NotFoundException
import se.nimsa.sbx.util.ExceptionCatching
import se.nimsa.sbx.util.FutureUtil.await
import se.nimsa.sbx.util.SbxExtensions._
import se.nimsa.sbx.util.{ExceptionCatching, SequentialPipeToSupport}

class ImportServiceActor(importDao: ImportDAO)(implicit timeout: Timeout) extends Actor with ExceptionCatching {
val log = Logging(context.system, this)
import scala.concurrent.{ExecutionContext, Future}

class ImportServiceActor(importDao: ImportDAO) extends Actor with ExceptionCatching with Stash with SequentialPipeToSupport {

import context.system

implicit val ec: ExecutionContext = system.dispatcher

val log = Logging(context.system, this)
log.info("Import service started")

override def receive = LoggingReceive {

case msg: ImportSessionRequest => catchAndReport {
case msg: ImportSessionRequest =>
msg match {

case AddImportSession(importSession) =>
await(importDao.importSessionForName(importSession.name)) match {
importDao.importSessionForName(importSession.name).flatMap {
case Some(existingImportSession) if existingImportSession.userId != importSession.userId =>
throw new IllegalArgumentException(s"An import session with name ${existingImportSession.name} and user ${existingImportSession.user} already exists")
Future.failed(new IllegalArgumentException(s"An import session with name ${existingImportSession.name} and user ${existingImportSession.user} already exists"))
case Some(existingImportSession) =>
sender ! existingImportSession
Future.successful(existingImportSession)
case None =>
val newImportSession = importSession.copy(filesImported = 0, filesAdded = 0, filesRejected = 0, created = now, lastUpdated = now)
sender ! await(importDao.addImportSession(newImportSession))
}
importDao.addImportSession(newImportSession)
}.pipeSequentiallyTo(sender)

case GetImportSessions(startIndex, count) =>
sender ! ImportSessions(await(importDao.getImportSessions(startIndex, count)))
pipe(importDao.getImportSessions(startIndex, count).map(ImportSessions)).to(sender)

case GetImportSession(id) =>
sender ! await(importDao.getImportSession(id))
pipe(importDao.getImportSession(id)).to(sender)

case DeleteImportSession(id) =>
sender ! await(importDao.removeImportSession(id))
importDao.removeImportSession(id).pipeSequentiallyTo(sender)

case GetImportSessionImages(id) =>
sender ! ImportSessionImages(await(importDao.listImagesForImportSessionId(id)))
pipe(importDao.listImagesForImportSessionId(id).map(ImportSessionImages)).to(sender)

case AddImageToSession(importSessionId, image, overwrite) =>
await(importDao.getImportSession(importSessionId)) match {
importDao.getImportSession(importSessionId).flatMap {
case Some(importSession) =>
val importSessionImage =
if (overwrite) {
val updatedImportSession = importSession.copy(filesImported = importSession.filesImported + 1, lastUpdated = now)
await(importDao.updateImportSession(updatedImportSession))
await(importDao.importSessionImageForImportSessionIdAndImageId(importSession.id, image.id))
.getOrElse(await(importDao.insertImportSessionImage(ImportSessionImage(-1, updatedImportSession.id, image.id))))
importDao.updateImportSession(updatedImportSession)
.flatMap { _ =>
importDao.importSessionImageForImportSessionIdAndImageId(importSession.id, image.id)
.flatMap(_
.map(Future.successful)
.getOrElse(importDao.insertImportSessionImage(ImportSessionImage(-1, updatedImportSession.id, image.id))))
}
} else {
val updatedImportSession = importSession.copy(filesImported = importSession.filesImported + 1, filesAdded = importSession.filesAdded + 1, lastUpdated = now)
await(importDao.updateImportSession(updatedImportSession))
await(importDao.insertImportSessionImage(ImportSessionImage(-1, updatedImportSession.id, image.id)))
importDao.updateImportSession(updatedImportSession)
.flatMap(_ => importDao.insertImportSessionImage(ImportSessionImage(-1, updatedImportSession.id, image.id)))
}
sender ! ImageAddedToSession(importSessionImage)
importSessionImage.map(ImageAddedToSession)
case None =>
throw new NotFoundException(s"Import session not found for id $importSessionId")
}
Future.failed(new NotFoundException(s"Import session not found for id $importSessionId"))
}.pipeSequentiallyTo(sender)

case UpdateSessionWithRejection(importSession) =>
val updatedImportSession = importSession.copy(filesRejected = importSession.filesRejected + 1, lastUpdated = now)
sender ! await(importDao.updateImportSession(updatedImportSession))
case UpdateSessionWithRejection(importSessionId) =>
importDao.getImportSession(importSessionId).map(_.map { importSession =>
val updatedImportSession = importSession.copy(filesRejected = importSession.filesRejected + 1, lastUpdated = now)
importDao.updateImportSession(updatedImportSession)
}).unwrap.pipeSequentiallyTo(sender)

}
}

}

def now = System.currentTimeMillis
def now: Long = System.currentTimeMillis

}

object ImportServiceActor {
def props(importDao: ImportDAO)(implicit timeout: Timeout): Props = Props(new ImportServiceActor(importDao))
def props(importDao: ImportDAO): Props = Props(new ImportServiceActor(importDao))
}

0 comments on commit 32b17b2

Please sign in to comment.