ActionScheduler
3 years ago
Triggers
3 years ago
Workers
3 years ago
CronHelper.php
3 years ago
CronTrigger.php
3 years ago
CronWorkerInterface.php
3 years ago
CronWorkerRunner.php
3 years ago
CronWorkerScheduler.php
3 years ago
Daemon.php
3 years ago
DaemonActionSchedulerRunner.php
3 years ago
DaemonHttpRunner.php
3 years ago
Supervisor.php
3 years ago
index.php
4 years ago
CronWorkerRunner.php
198 lines
| 1 | <?php // phpcs:ignore SlevomatCodingStandard.TypeHints.DeclareStrictTypes.DeclareStrictTypesMissing |
| 2 | |
| 3 | namespace MailPoet\Cron; |
| 4 | |
| 5 | if (!defined('ABSPATH')) exit; |
| 6 | |
| 7 | |
| 8 | use MailPoet\Entities\ScheduledTaskEntity; |
| 9 | use MailPoet\Logging\LoggerFactory; |
| 10 | use MailPoet\Newsletter\Sending\ScheduledTasksRepository; |
| 11 | use MailPoet\WP\Functions as WPFunctions; |
| 12 | use MailPoetVendor\Carbon\Carbon; |
| 13 | |
| 14 | class CronWorkerRunner { |
| 15 | const TASK_BATCH_SIZE = 5; |
| 16 | const TASK_RUN_TIMEOUT = 120; |
| 17 | const TIMED_OUT_TASK_RESCHEDULE_TIMEOUT = 5; |
| 18 | |
| 19 | /** @var float */ |
| 20 | private $timer; |
| 21 | |
| 22 | /** @var CronHelper */ |
| 23 | private $cronHelper; |
| 24 | |
| 25 | /** @var CronWorkerScheduler */ |
| 26 | private $cronWorkerScheduler; |
| 27 | |
| 28 | /** @var WPFunctions */ |
| 29 | private $wp; |
| 30 | |
| 31 | /** @var ScheduledTasksRepository */ |
| 32 | private $scheduledTasksRepository; |
| 33 | |
| 34 | /** @var LoggerFactory */ |
| 35 | private $loggerFactory; |
| 36 | |
| 37 | public function __construct( |
| 38 | CronHelper $cronHelper, |
| 39 | CronWorkerScheduler $cronWorkerScheduler, |
| 40 | WPFunctions $wp, |
| 41 | ScheduledTasksRepository $scheduledTasksRepository, |
| 42 | LoggerFactory $loggerFactory |
| 43 | ) { |
| 44 | $this->timer = microtime(true); |
| 45 | $this->cronHelper = $cronHelper; |
| 46 | $this->cronWorkerScheduler = $cronWorkerScheduler; |
| 47 | $this->wp = $wp; |
| 48 | $this->scheduledTasksRepository = $scheduledTasksRepository; |
| 49 | $this->loggerFactory = $loggerFactory; |
| 50 | } |
| 51 | |
| 52 | public function run(CronWorkerInterface $worker) { |
| 53 | // abort if execution limit is reached |
| 54 | $this->cronHelper->enforceExecutionLimit($this->timer); |
| 55 | $dueTasks = $this->getDueTasks($worker); |
| 56 | $runningTasks = $this->getRunningTasks($worker); |
| 57 | |
| 58 | if (!$worker->checkProcessingRequirements()) { |
| 59 | foreach (array_merge($dueTasks, $runningTasks) as $task) { |
| 60 | $this->scheduledTasksRepository->remove($task); |
| 61 | $this->scheduledTasksRepository->flush(); |
| 62 | } |
| 63 | return false; |
| 64 | } |
| 65 | |
| 66 | $worker->init(); |
| 67 | |
| 68 | if (!$dueTasks && !$runningTasks) { |
| 69 | if ($worker->scheduleAutomatically()) { |
| 70 | $this->cronWorkerScheduler->schedule($worker->getTaskType(), $worker->getNextRunDate()); |
| 71 | } |
| 72 | return false; |
| 73 | } |
| 74 | |
| 75 | try { |
| 76 | foreach ($dueTasks as $task) { |
| 77 | $this->prepareTask($worker, $task); |
| 78 | } |
| 79 | // Re-fetch running tasks so that we can process tasks that were just prepared |
| 80 | $runningTasks = $this->getRunningTasks($worker); |
| 81 | foreach ($runningTasks as $task) { |
| 82 | $this->processTask($worker, $task); |
| 83 | } |
| 84 | } catch (\Exception $e) { |
| 85 | if (isset($task) && $task && $e->getCode() !== CronHelper::DAEMON_EXECUTION_LIMIT_REACHED) { |
| 86 | /** |
| 87 | * ToDo: Use \LoggerFactory::TOPIC_CRON as logger topic, once it is available |
| 88 | */ |
| 89 | $this->loggerFactory->getLogger()->error($e->getMessage(), ['error' => $e]); |
| 90 | $this->cronWorkerScheduler->rescheduleProgressively($task); |
| 91 | } |
| 92 | throw $e; |
| 93 | } |
| 94 | |
| 95 | return true; |
| 96 | } |
| 97 | |
| 98 | private function getDueTasks(CronWorkerInterface $worker) { |
| 99 | return $this->scheduledTasksRepository->findDueByType($worker->getTaskType(), self::TASK_BATCH_SIZE); |
| 100 | } |
| 101 | |
| 102 | private function getRunningTasks(CronWorkerInterface $worker) { |
| 103 | return $this->scheduledTasksRepository->findRunningByType($worker->getTaskType(), self::TASK_BATCH_SIZE); |
| 104 | } |
| 105 | |
| 106 | private function prepareTask(CronWorkerInterface $worker, ScheduledTaskEntity $task) { |
| 107 | // abort if execution limit is reached |
| 108 | $this->cronHelper->enforceExecutionLimit($this->timer); |
| 109 | |
| 110 | $prepareCompleted = $worker->prepareTaskStrategy($task, $this->timer); |
| 111 | |
| 112 | if ($prepareCompleted) { |
| 113 | $task->setStatus(null); |
| 114 | $this->scheduledTasksRepository->persist($task); |
| 115 | $this->scheduledTasksRepository->flush(); |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | private function processTask(CronWorkerInterface $worker, ScheduledTaskEntity $task) { |
| 120 | // abort if execution limit is reached |
| 121 | $this->cronHelper->enforceExecutionLimit($this->timer); |
| 122 | |
| 123 | if (!$worker->supportsMultipleInstances()) { |
| 124 | if ($this->rescheduleOutdated($task)) { |
| 125 | return false; |
| 126 | } |
| 127 | if ($this->isInProgress($task)) { |
| 128 | return false; |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | $this->startProgress($task); |
| 133 | |
| 134 | try { |
| 135 | $completed = $worker->processTaskStrategy($task, $this->timer); |
| 136 | } catch (\Exception $e) { |
| 137 | $this->stopProgress($task); |
| 138 | throw $e; |
| 139 | } |
| 140 | |
| 141 | if ($completed) { |
| 142 | $this->complete($task); |
| 143 | } |
| 144 | |
| 145 | $this->stopProgress($task); |
| 146 | |
| 147 | return (bool)$completed; |
| 148 | } |
| 149 | |
| 150 | private function rescheduleOutdated(ScheduledTaskEntity $task) { |
| 151 | $currentTime = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); |
| 152 | |
| 153 | if (empty($task->getUpdatedAt())) { |
| 154 | // missing updatedAt, consider this task outdated (set year to 2000) and reschedule |
| 155 | $updatedAt = Carbon::createFromDate(2000); |
| 156 | } else if (!$task->getUpdatedAt() instanceof Carbon) { |
| 157 | $updatedAt = new Carbon($task->getUpdatedAt()); |
| 158 | } else { |
| 159 | $updatedAt = $task->getUpdatedAt(); |
| 160 | } |
| 161 | |
| 162 | // If the task is running for too long consider it stuck and reschedule |
| 163 | if (!empty($task->getUpdatedAt()) && $updatedAt->diffInMinutes($currentTime, false) > self::TASK_RUN_TIMEOUT) { |
| 164 | $this->stopProgress($task); |
| 165 | $this->cronWorkerScheduler->reschedule($task, self::TIMED_OUT_TASK_RESCHEDULE_TIMEOUT); |
| 166 | return true; |
| 167 | } |
| 168 | return false; |
| 169 | } |
| 170 | |
| 171 | private function isInProgress(ScheduledTaskEntity $task) { |
| 172 | if ($task->getInProgress()) { |
| 173 | // Do not run multiple instances of the task |
| 174 | return true; |
| 175 | } |
| 176 | return false; |
| 177 | } |
| 178 | |
| 179 | private function startProgress(ScheduledTaskEntity $task) { |
| 180 | $task->setInProgress(true); |
| 181 | $this->scheduledTasksRepository->persist($task); |
| 182 | $this->scheduledTasksRepository->flush(); |
| 183 | } |
| 184 | |
| 185 | private function stopProgress(ScheduledTaskEntity $task) { |
| 186 | $task->setInProgress(false); |
| 187 | $this->scheduledTasksRepository->persist($task); |
| 188 | $this->scheduledTasksRepository->flush(); |
| 189 | } |
| 190 | |
| 191 | private function complete(ScheduledTaskEntity $task) { |
| 192 | $task->setProcessedAt(Carbon::createFromTimestamp($this->wp->currentTime('timestamp'))); |
| 193 | $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); |
| 194 | $this->scheduledTasksRepository->persist($task); |
| 195 | $this->scheduledTasksRepository->flush(); |
| 196 | } |
| 197 | } |
| 198 |