programing

rxjs 가변 응답 시간을 가진 엔드포인트의 주기적 폴링

prostudy 2022. 3. 21. 09:06
반응형

rxjs 가변 응답 시간을 가진 엔드포인트의 주기적 폴링

나는 1초에 한 번 이상, 그리고 끝점을 폴링하는 데 걸리는 시간보다 더 느리지 않게 끝점을 폴링하고 싶다.한 가지 이상의 미결 요청이 있어서는 안 된다.

나는 적어도 1초에 한 번 엔드포인트를 폴링할 수 있는 반응형 프로그래밍 방식을 원하지만 엔드포인트가 1초 이상 걸리면 다음 요청은 즉시 실행된다.

아래 대리석 도표에서 2차, 3차 요청은 1초 이상 걸리지만 4차, 5차 요청은 더 빨리 마무리된다.다음 요청은 1초 경계에서 발생하거나 마지막 미결 요청으로부터 데이터를 얻는 즉시 발사된다.

s---s---s---s---s---s---| # 1 second interval observable
r---r----r--------r-r---| # endpoint begin polling events
-d-------d--------dd-d--| # endpoint data response events

대리석 도표에서 용어를 정확하게 맞추려고 하기 때문에 엔드포인트 요청의 시작은 대리석 I 라벨 "r"이고, 대리석 이벤트 라벨 "d"는 엔드포인트 데이터를 가지고 있다고 가정하고 있다.

이것을 하기 위해 얼마나 많은 코드가 필요했는가를 여기 들어보자. 그러나 내가 위에서 요청한 대로 그 이후의 요청은 즉시 실행되지 않는다.

var poll;
var previousData;
var isPolling = false;
var dashboardUrl = 'gui/metrics/dashboard';
var intervalMs = updateServiceConfig.getIntervalInMilliSecondForCharts();

return {
    startInterval: startInterval,
    stopInterval: stopInterval
};

function startInterval() {
    stopInterval();
    tryPolling(); // immediately hit the dashboard
    // attempt polling at the interval
    poll = $interval(tryPolling, intervalMs);
}

/**
 * attempt polling as long as there is no in-flight request
 * once the in-flight request completes or fails, allow the next request to be processed
 */
function tryPolling() {
    if (!isPolling) {
        isPolling = true;

        getDashboard()
        // if the dashboard either returns successful or fails, reset the polling boolean
            .then(resetPolling, resetPolling);
    }
}

/** there's no longer an in-flight request, so reset the polling boolean */
function resetPolling() {
    isPolling = false;
}

function stopInterval() {
    if (poll) {
        $interval.cancel(poll);
        poll = undefined;
    }
}

function getDashboard() {
    return restfulService.get(dashboardUrl)
        .then(updateDashboard);
}

function updateDashboard(data) {
    if (!utils.deepEqual(data, previousData)) {
        previousData = angular.copy(data);
        $rootScope.$broadcast('$dashboardLoaded', data);
    }
}

여기 내 해결책이 있다.내부적인 주제를 사용하지만combineLatest그리고filter응답의 도착이 다음보다 느릴 경우 요청이 누적되지 않도록 하기 위해timer마침표를 찍다

코멘트는 어떻게 작동하는지 설명해야 한다.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
    return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

function poll() {

  return Rx.Observable.defer(() => {

    // Use defer so that the internal subject is created for each
    // subscription.
    const subject = new Rx.BehaviorSubject({ tick: -1, pending: false });

    return Rx.Observable
    
      // Combine the timer and the subject's state.
      .combineLatest(
        Rx.Observable.timer(0, 1000).do(tick => console.log("tick", tick)),
        subject
      )

      // Filter out combinations in which either a more recent tick
      // has not occurred or a request is pending.
      .filter(([tick, state]) => (tick !== state.tick) && !state.pending)

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: true }))
      
      // Make the request and use the result selector to combine
      // the tick and the response.
      .mergeMap(([tick]) => mock(), ([tick], resp) => [tick, resp])

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: false }))
      
      // Map the response.
      .map(([tick, resp]) => resp);
  });
}

poll().take(delays.length).subscribe(r => console.log(r));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>


정확히 이렇게 하는 운영자가 있다는 생각이 방금 들었다.exhaustMap.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
  return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

const poll = Rx.Observable
  .timer(0, 1000)
  .do(tick => console.log("tick", tick))
  .exhaustMap(() => mock());

poll.take(delays.length).subscribe(r => console.log(r));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

난 이게 네가 원하는 걸 한다고 믿어:

let counter = 0;
function apiCall() {
  const delay = Math.random() * 1000;
  const count = ++counter;
  return Rx.Observable.timer(delay).mapTo(count);
}

Rx.Observable.timer(0, 1000)
  .mergeMap(() => apiCall())
  .take(1)
  .repeat()
  .subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

  • timer(0, 1000): 그 직후 1초 간격으로 방출한다.
  • mergeMap(...): api 호출에 의해 반환되는 관측 가능으로 전환한다.이렇게 하면 각 재시도마다 관찰할 수 있는 새로운 결과가 생성될 것이다.재시도할 때마다 새 항목을 만들지 않으려면 이 항목을 다음으로 바꾸십시오.mergeMapTo(apiCall()).
  • take(1): api를 내보낸 후 타이머가 실행되지 않도록 구독을 강제로 완료
  • repeat(): api가 방출되면 시퀀스 다시 시작

그래서 즉시 api로 전화할 것이다.1초 이내에 돌아오지 않으면 매초마다 다른 전화가 걸려올 것이다.일단 api 호출에서 응답이 발생하면 타이머가 취소되고 전체 시퀀스가 다시 시작된다.이것은 당신의 의도에 부합한다고 믿어지는 기내 요청을 취소하지 않을 것이다.

편집: 이전 요청 이전에 나중에 요청이 반환되는 경우 이전 요청은 취소됨

Rxj에만 근거하고 부작용(변수할당 없음)이 없는, 그리고 허리 압박이 없는 답을 내놓기 전에 15mn 동안 그것에 대해 생각해 보아야 했다!

const { Observable } = Rx;

const mockHttpRequest = url =>
  Observable
    .of('ok')
    .do(x => console.log('fetching...'))
    .delay(250);

const poll = (httpRequest$, ms) => {
  const tick$ = Observable.timer(ms);

  return Observable
    .zip(httpRequest$, tick$)
    .repeat()
    .map(([httpResult]) => httpResult);
};

poll(mockHttpRequest('your-url-here'), 1000)
  .do(console.log)
  .subscribe();

여기 일하는 플렁크르: https://plnkr.co/edit/sZTjLedNCE64bgLNhnaS?p=preview

참조URL: https://stackoverflow.com/questions/48212752/rxjs-periodic-polling-of-an-endpoint-with-a-variable-response-time

반응형