vendor/okvpn/cron-bundle/src/Middleware/CronMiddlewareEngine.php line 120

Open in your IDE?
  1. <?php
  2. declare(strict_types=1);
  3. namespace Okvpn\Bundle\CronBundle\Middleware;
  4. use Okvpn\Bundle\CronBundle\Cron\CronChecker;
  5. use Okvpn\Bundle\CronBundle\Model\EnvelopeTools as ET;
  6. use Okvpn\Bundle\CronBundle\Model\EnvironmentStamp;
  7. use Okvpn\Bundle\CronBundle\Model\PeriodicalStampInterface;
  8. use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope;
  9. use Okvpn\Bundle\CronBundle\Model\ScheduleStamp;
  10. use Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface;
  11. use Okvpn\Bundle\CronBundle\Runner\TimerStorage;
  12. use Okvpn\Bundle\CronBundle\Utils\CronUtils;
  13. use Psr\Clock\ClockInterface as PsrClockInterface;
  14. final class CronMiddlewareEngine implements MiddlewareEngineInterface
  15. {
  16.     private $timeZone;
  17.     private $checker;
  18.     private $clock;
  19.     /** @var ScheduleLoopInterface|null */
  20.     private $scheduleLoop;
  21.     private $lastLoopTasks = [];
  22.     private $timers;
  23.     public function __construct(CronChecker $checker, ?string $timeZone null, ?PsrClockInterface $clock null, ?ScheduleLoopInterface $scheduleLoop null, ?TimerStorage $timers null)
  24.     {
  25.         $this->timeZone $timeZone;
  26.         $this->checker $checker;
  27.         $this->clock $clock;
  28.         $this->scheduleLoop $scheduleLoop;
  29.         $this->timers $timers ?? new TimerStorage();
  30.     }
  31.     /**
  32.      * {@inheritdoc}
  33.      */
  34.     public function handle(ScheduleEnvelope $envelopeStackInterface $stack): ScheduleEnvelope
  35.     {
  36.         $env $envelope->has(EnvironmentStamp::class) ? $envelope->get(EnvironmentStamp::class)->toArray() : [];
  37.         // For testing usage. drops next middlewares
  38.         if ($dryRun = ($env['dry-run'] ?? false)) {
  39.             $stack->end();
  40.         }
  41.         if (!$stamp $envelope->get(PeriodicalStampInterface::class)) {
  42.             $dryRun null ET::info($envelope"{{ task }} > Run schedule task.");
  43.             return $stack->next()->handle($envelope$stack);
  44.         }
  45.         $useDemand $env['demand'] ?? false;
  46.         $noLoop $env['no-loop'] ?? false;
  47.         $now = ($env['now'] ?? null) instanceof \DateTimeInterface $env['now'] : $this->getNow();
  48.         return true === $useDemand && null !== $this->scheduleLoop && false === $noLoop ?
  49.             $this->handleDemand($now$envelope$stamp$stack) :
  50.             $this->handleNoDemand($now$envelope$stamp$stack);
  51.     }
  52.     private function handleDemand(\DateTimeInterface $nowScheduleEnvelope $envelopePeriodicalStampInterface $stampStackInterface $stack): ScheduleEnvelope
  53.     {
  54.         $this->lastLoopTasks[$hash ET::calculateHash($envelope)] = 1;
  55.         if ($this->timers->hasTimer($hash)) {
  56.             [$timer$prevEnvelope] = $this->timers->getTimer($hash);
  57.             if ((string)$prevEnvelope->get(PeriodicalStampInterface::class) !== (string) $stamp) {
  58.                 $this->timers->remove($hash);
  59.                 $this->scheduleLoop->cancelTimer($timer);
  60.                 ET::notice($envelope"{{ task }} > Cron expression has been changed.");
  61.             } else {
  62.                 $this->timers->refreshEnvelope($envelope);
  63.                 return $stack->end()->handle($envelope$stack);
  64.             }
  65.         }
  66.         $prevEnvelope $envelope;
  67.         $timers $this->timers;
  68.         $loop $this->scheduleLoop;
  69.         $nextTime $stamp->getNextRunDate($now $loop->now());
  70.         $timers->attach($envelope$runner = static function (/* $periodical = true*/) use ($timers$prevEnvelope$hash$stack$stamp$loop, &$runner): ScheduleEnvelope {
  71.             if (null === ($envelope $timers->findByHash($hash))) {
  72.                 ET::notice($prevEnvelope"{{ task }} > Task canceled. Someone detached an envelope from timers storage");
  73.                 return ($clone = clone $stack)->end()->handle($envelope->without(PeriodicalStampInterface::class), $clone);
  74.             }
  75.             ET::info($envelope"{{ task }} > Run schedule task.");
  76.             try {
  77.                 $result = ($clone = clone $stack)->next()->handle($envelope->without(PeriodicalStampInterface::class), $clone);
  78.             } catch (\Throwable $e) {
  79.                 $result $envelope;
  80.                 ET::error($envelope"{{ task }} > Task ERRORED. {$e->getMessage()}", ['e' => $e]);
  81.             }
  82.             if (false !== (\func_get_args()[0] ?? null)) {
  83.                 $nextTime $stamp->getNextRunDate($now $loop->now());
  84.                 $delay = (float) $nextTime->format('U.u') - (float) $now->format('U.u');
  85.                 $loop->addTimer($delay$runner);
  86.                 ET::debug($envelope\sprintf("{{ task }} > was scheduled with delay %.6F sec."$delay));
  87.             }
  88.             return $result;
  89.         });
  90.         $delay = (float) $nextTime->format('U.u') - (float) $now->format('U.u');
  91.         ET::debug($envelope\sprintf("{{ task }} > was scheduled with delay %.6F sec."$delay));
  92.         $loop->addTimer($delay$runner);
  93.         return ($clone = clone $stack)->end()->handle($envelope$clone);
  94.     }
  95.     public function onLoopEnd(): void
  96.     {
  97.         $this->cancelOrphanTasks();
  98.         $this->lastLoopTasks = [];
  99.     }
  100.     private function handleNoDemand(\DateTimeInterface $nowScheduleEnvelope $envelopePeriodicalStampInterface $stampStackInterface $stack): ScheduleEnvelope
  101.     {
  102.         if ($stamp instanceof ScheduleStamp) {
  103.             try {
  104.                 $isDue $this->checker->isDue($expr $stamp->cronExpression(), $this->timeZone$now);
  105.             } catch (\Throwable $e) {
  106.                 ET::error($envelope"{{ task }} > The cron expression $expr for task is invalid. {$e->getMessage()}", ['e' => $e]);
  107.                 return $stack->end()->handle($envelope$stack);
  108.             }
  109.         } else {
  110.             $currentTime = (int)(60 \floor($now->getTimestamp()/60));
  111.             $now CronUtils::toDate(($currentTime-1), $now);
  112.             $nextRun $stamp->getNextRunDate($now);
  113.             $nextRun = (int)(60 \floor($nextRun->getTimestamp()/60));
  114.             $isDue $nextRun === $currentTime;
  115.         }
  116.         if ($isDue) {
  117.             ET::info($envelope"{{ task }} > The schedule task is due now!");
  118.             return $stack->next()->handle($envelope->without(PeriodicalStampInterface::class), $stack);
  119.         } else {
  120.             ET::debug($envelope"{{ task }} > Skipped the schedule task by cron restriction");
  121.         }
  122.         return $stack->end()->handle($envelope$stack);
  123.     }
  124.     private function cancelOrphanTasks(): void
  125.     {
  126.         foreach ($this->timers->getTimers() as $hash => [$timer$envelope]) {
  127.             if (!isset($this->lastLoopTasks[$hash])) {
  128.                 ET::notice($envelope"{{ task }} > task canceled - is not active anymore");
  129.                 $this->scheduleLoop->cancelTimer($timer);
  130.                 $this->timers->remove($hash);
  131.             }
  132.         }
  133.     }
  134.     private function getNow(): \DateTimeImmutable
  135.     {
  136.         return $this->clock $this->clock->now() : new \DateTimeImmutable('now'$this->timeZone ? new \DateTimeZone($this->timeZone) : null);
  137.     }
  138. }