IT박스

미래가 될 때까지 기다려

itboxs 2021. 1. 8. 08:02
반응형

미래가 될 때까지 기다려 수행


실행중인 비동기 작업이 거의없고 그중 하나 이상이 완료 될 때까지 기다려야합니다 (미래에는 N 개의 작업 중 util M이 완료 될 때까지 기다려야합니다). 현재 그들은 Future로 제시되어 있으므로 다음과 같은 것이 필요합니다.

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

이와 같은 것이 있습니까? 또는 유사한 것, 미래에는 필요하지 않습니다. 현재 나는 선물 수집을 반복하고, 완료되었는지 확인한 다음 잠시 잠을 자고 다시 확인합니다. 이것은 최선의 해결책이 아닌 것 같습니다. 왜냐하면 내가 오랫동안 수면을 취하면 원치 않는 지연이 추가되고 짧은 시간 동안 수면을 취하면 성능에 영향을 미칠 수 있기 때문입니다.

나는 사용해 볼 수 있었다

new CountDownLatch(1)

작업이 완료되면 카운트 다운을 줄이고

countdown.await()

하지만 미래 창조를 통제해야만 가능하다는 것을 알았습니다. 가능하지만 현재 작업 생성 로직 (Callable을 ExecutorService로 전송)이 어떤 Future를 기다리는 지 결정과 분리되어 있기 때문에 시스템 재 설계가 필요합니다. 나는 또한 재정의 할 수있다

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

작업이 완료 될 때 알림을받을 리스너를 연결하는 기능을 사용하여 RunnableFuture의 사용자 지정 구현을 만든 다음 필요한 작업에 이러한 리스너를 연결하고 CountDownLatch를 사용합니다. 그러나 이는 내가 사용하는 모든 ExecutorService에 대해 newTaskFor를 재정의해야 함을 의미하며 잠재적으로 구현이있을 것입니다. AbstractExecutorService를 확장하지 않습니다. 동일한 목적으로 주어진 ExecutorService를 래핑 할 수도 있지만, Futures를 생성하는 모든 메서드를 장식해야합니다.

이 모든 솔루션은 작동하지만 매우 부 자연스럽게 보입니다. 간단한 것을 놓친 것 같습니다.

WaitHandle.WaitAny(WaitHandle[] waitHandles)

C #에서. 이러한 종류의 문제에 대한 잘 알려진 해결책이 있습니까?

최신 정보:

원래 퓨처 크리에이션에 대한 접근 권한이 전혀 없었기 때문에 우아한 솔루션이 없었습니다. 시스템을 재 설계 한 후 Future 생성에 액세스하고 실행 프로세스에 countDownLatch.countdown ()을 추가 할 수있었습니다. 그러면 countDownLatch.await ()를 사용할 수 있으며 모든 것이 잘 작동합니다. 다른 답변을 주셔서 감사합니다. ExecutorCompletionService에 대해 몰랐으며 실제로 유사한 작업에 도움이 될 수 있지만,이 특별한 경우에는 실행자없이 일부 Future가 생성되기 때문에 사용할 수 없었습니다. 실제 작업은 네트워크를 통해 다른 서버로 전송됩니다. 원격으로 완료되고 완료 알림이 수신됩니다.


내가 아는 한 Java는 WaitHandle.WaitAny메소드와 유사한 구조가 없습니다 .

이것은 "WaitableFuture"데코레이터를 통해 달성 될 수있는 것 같습니다.

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

실행 코드 앞에 삽입 할 수있는 경우에만 작동하지만 그렇지 않으면 실행 코드에 새 doTask()메서드 가 없기 때문 입니다. 그러나 실행 전에 Future 객체를 어떻게 든 제어 할 수 없다면 폴링 없이는 이것을 할 수있는 방법이 없습니다.

또는 미래가 항상 자체 스레드에서 실행되고 어떻게 든 해당 스레드를 얻을 수 있습니다. 그런 다음 새 스레드를 생성하여 서로 결합한 다음 결합이 반환 된 후 대기 메커니즘을 처리 할 수 ​​있습니다. 이것은 정말 추하고 많은 오버 헤드를 유발합니다. 일부 Future 객체가 완료되지 않으면 죽은 스레드에 따라 차단 된 스레드가 많이있을 수 있습니다. 주의하지 않으면 메모리와 시스템 리소스가 누출 될 수 있습니다.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

간단하고 ExecutorCompletionService를 확인하십시오 .


ExecutorService.invokeAny


결과 대기열을 만들고 대기열에서 기다리지 않는 이유는 무엇입니까? 또는 더 간단하게 CompletionService를 사용하십시오. ExecutorService + 결과 큐입니다.


wait () 및 notifyAll ()을 사용하면 실제로 매우 쉽습니다.

먼저 잠금 개체를 정의합니다. (당신은 이것에 대해 어떤 클래스를 사용할 수 있지만 명시적인 것을 좋아합니다) :

package com.javadude.sample;

public class Lock {}

다음으로 작업자 스레드를 정의하십시오. 처리가 완료되면 잠금 개체에 알려야합니다. 알림은 잠금 개체에 대해 동기화 된 블록 잠금에 있어야합니다.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

마지막으로 클라이언트를 작성할 수 있습니다.

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

어느 것이 끝나는 지 신경 쓰지 않기 때문에 모든 스레드에 대해 하나의 WaitHandle을 가지고 기다리지 않는 이유는 무엇입니까? 어느 쪽이 먼저 끝나 든 핸들을 설정할 수 있습니다.


이 옵션을 참조하십시오.

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}

ReferenceURL : https://stackoverflow.com/questions/117690/wait-until-any-of-futuret-is-done

반응형