IT박스

예외가 발생하면 Akka Actor가 종료되지 않음

itboxs 2020. 10. 25. 11:51
반응형

예외가 발생하면 Akka Actor가 종료되지 않음


나는 현재 Akka를 시작하려고 노력하고 있으며 이상한 문제에 직면하고 있습니다. 내 액터에 대한 다음 코드가 있습니다.

class AkkaWorkerFT extends Actor {
  def receive = {
    case Work(n, c) if n < 0 => throw new Exception("Negative number")
    case Work(n, c) => self reply n.isProbablePrime(c);
  }
}

그리고 이것이 제가 일꾼을 시작하는 방법입니다.

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

그리고 이것이 내가 모든 것을 종료하는 방법입니다.

futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill

이제 작업자에게 n> 0 (예외가 발생하지 않음) 메시지를 보내면 모든 것이 제대로 작동하고 응용 프로그램이 제대로 종료됩니다. 그러나 예외가 발생하는 단일 메시지를 보내 자마자 여전히 액터가 실행 중이므로 응용 프로그램이 종료되지 않지만 출처를 알 수 없습니다.

도움이되는 경우 다음은 해당 스레드의 스택입니다.

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) 
    Unsafe.park(boolean, long) line: not available [native method]  
    LockSupport.park(Object) line: 158  
    AbstractQueuedSynchronizer$ConditionObject.await() line: 1987   
    LinkedBlockingQueue<E>.take() line: 399 
    ThreadPoolExecutor.getTask() line: 947  
    ThreadPoolExecutor$Worker.run() line: 907   
    MonitorableThread(Thread).run() line: 680   
    MonitorableThread.run() line: 182   

추신 : 종료되지 않는 스레드는 작업자 스레드가 아닙니다. postStop 콜백을 추가했기 때문에 모든 스레드가 제대로 중지됩니다.

PPS : Actors.registry.shutdownAll문제를 해결하지만 shutdownAll은 마지막 수단으로 만 사용해야한다고 생각합니다. 그렇지 않습니까?


akka 행위자 내부의 문제를 처리하는 적절한 방법은 예외를 던지는 것이 아니라 감독자 계층을 설정하는 것입니다.

"동시 코드에서 예외를 던지면 (연결되지 않은 액터를 사용한다고 가정 해 보겠습니다) 현재 액터를 실행하는 스레드를 그냥 날려 버릴 것입니다.

(스택 추적 검사를 제외하고) 문제가 발생했는지 알아낼 방법이 없습니다. 당신이 할 수있는 일은 아무것도 없습니다. "

(120)을 통해 관리자 계층 내결함성

* 참고 * 위의 내용은 이전 버전의 Akka (1.2)에 해당됩니다. 최신 버전 (예 : 2.2)에서는 여전히 감독자 계층 구조를 설정하지만 하위 프로세스에서 발생하는 예외를 트랩합니다. 예 :

class Child extends Actor {
    var state = 0
    def receive = {
      case ex: Exception ⇒ throw ex
      case x: Int        ⇒ state = x
      case "get"         ⇒ sender ! state
    }
  }

그리고 감독자에서 :

class Supervisor extends Actor {
    import akka.actor.OneForOneStrategy
    import akka.actor.SupervisorStrategy._
    import scala.concurrent.duration._

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
        case _: ArithmeticException      ⇒ Resume
        case _: NullPointerException     ⇒ Restart
        case _: IllegalArgumentException ⇒ Stop
        case _: Exception                ⇒ Escalate
      }

    def receive = {
      case p: Props ⇒ sender ! context.actorOf(p)
    }
  }

(2.2)를 통해 관리자 계층 내결함성


Viktor가 제안한 것처럼 일이 종료되는지 확인하기 위해 로깅을 끄는 것은 약간 이상합니다. 대신 할 수있는 일은 다음과 같습니다.

EventHandler.shutdown()

예외 후 세계를 계속 실행하는 모든 (로거) 리스너를 깨끗하게 종료합니다.

def shutdown() {
  foreachListener(_.stop())
  EventHandlerDispatcher.shutdown()
}

로거의 회전 akka.conf

참고 URL : https://stackoverflow.com/questions/6170227/akka-actor-not-terminating-if-an-exception-is-thrown

반응형