How to schedule a Symfony Console App execition using supervidord? Not using cron job


Introduction

Scheduling a periodic execution of a Symfony command is a commonly used task. The jobs to be scheduled are diverse, either performing inspections of fields into a databse, sending email notifications to certain recipients on certain days within the week, scanning parameters at certain hours every specific days into the week and ... a long etc. 

Problem 

 How to schedule a weekly Symfony command execution?

Solutions

Cron job: Often this kind of situations are performed by using a cron job. 

It's easy to implement but has some downsides. Let's consider the following failure scenarios:

  • The cron job fails at the moment of the execution for any reason.
  • The executed task throws an exception at the same time the cron job is configured to execute.

For those cases, the task would not be run again until the next week, leaving the data unprocessed for 15 days. 

Supervisord

An alternative is to create a mechanism, similar to a cron job, by using a long-running process managed by supervisord.

The mechanism would be to execute the first inspection, and every 10 minutes to execute an inspection until reaching a total of 10. The fecuency will be Ten inspections weekly (1 every 10 minutes) .

Those additional inspections are executed for retrying purposes if the previous is not able to build a successful result. 

 






 

 

Creating the supervisord listener 

Supervisord has a very useful tool on Supervisord Event Listeners. It allows listening to system events by implementing a listener in many languages and execute a task (in our case our Symfony command app) once an event is triggered.
http://supervisord.org/events.html#events . Supervidord has an event called TICK_60 that may be subscribed to for event listeners to receive “wake-up” notifications every 60 seconds.


[eventlistener:name_of_the_program]
command=php /var/www/symfony/src/path/supervisord_listener.php %(ENV_APP_ENV)s
events=TICK_60,PROCESS_STATE_RUNNING
user=apache
process_name=%(program_name)s-%(process_num)s
numprocs=1

 

Implementing the Listener in php

<?php

$env = $argv[1];

function listenEvent($callback): void
{
fwrite(STDOUT, "READY\n");

while (true) {
// Sanitizing supervisor event token: Token is sent though STDIN resource.
if (!$token = trim(fgets(STDIN))) {
break;
}

// Getting token as array (key => value)
$headers = parseHeader($token);
if (!array_key_exists('eventname', $headers)) {
break;
}

// Sanitizing supervisor event token: Listening only the supported events.
$eventName = $headers['eventname'];
$supportedEvents = ['TICK_60', 'PROCESS_STATE_RUNNING'];
if (!in_array($eventName, $supportedEvents)) {
break;
}

// Executing task
$result = call_user_func($callback, $eventName);

// Switching to supervisord state according task execution result.
if (true === $result) {
fwrite(STDOUT, "RESULT 2\nOK");
} elseif (false === $result) {
fwrite(STDOUT, "RESULT 4\nFAIL");
} else {
break;
}

fwrite(STDOUT, "READY\n");
}
}

$taskExecutor = function (string $event) use ($env) {
// Generating the System's id for the shared memory blocks.
$ticksSysId = ftok(__FILE__, 't');
$throttlingSysId = ftok(__FILE__, 'h');

// Once supervisord process starts running this prepares shared memory blocks to save auxiliary counters.
if ('PROCESS_STATE_RUNNING' === $event) {
$ticksResourceId = shmop_open($ticksSysId, "c", 0600, 1);
$throttlingResourceId = shmop_open($throttlingSysId, "c", 0600, 1008);
flushCounter($ticksResourceId);
flushCounter($throttlingResourceId);

return true;
}

// Stating opening the shared memory block to save the counter for TICK_60 event. Counter of 10 minutes.
$ticksResourceId = shmop_open($ticksSysId, "c", 0600, 1);
$ticks = incrementCounter($ticksResourceId, 1);

$maxTens = 10; // 10 tens represents 10x60 seconds. 10 Minutes.
$maxExecutions = 10; // Maximum 10 executions of the task (the symfony console app) within q singe week.
$tensInWeek = 1008; // Count of tens within a week. 60480/600 = 1008. Once arrive to it a week is passed.

if ($maxTens === $ticks) {
// Flushing ticks storage. Getting ready to count others 10 TICK_60 events more.
flushCounter($ticksResourceId);

// Controlling executions counter. No more than 10 per week. If so, execute, otherwise skip.
$throttlingResourceId = shmop_open($throttlingSysId, "c", 0600, 1008);
$executionCount = incrementCounter($throttlingResourceId, 1008);
$notMaxedExecutions = $executionCount <= $maxExecutions;
$weekArrived = $tensInWeek === $executionCount;

// If the max rate of execution within a week hasn't passed yet, execute. Otherwise counter is only increased
// within the respective shared memory block.
if ($notMaxedExecutions) {
putenv('APP_ENV=' . $env);
exec('php /path/to/symfony/bin/console command_name');
}

// If a week has passed then restart the counter of executed inspections and restart to execute again.
// It's done by restarting the respective counter within the respective shared memory block.
if ($weekArrived) {
flushCounter($throttlingResourceId);
}
}

return true;
};

function parseHeader(string $headerToken): array
{
$parsed = [];
foreach (explode(' ', $headerToken) as $pair) {
if (!strpos($pair, ':')) {
continue;
}

list($key, $value) = array_map('trim', explode(':', $pair));
$parsed[$key] = $value;
}

return $parsed;
}

function incrementCounter($resourceId, $size): int
{
$value = (int)shmop_read($resourceId, 0, $size);
if (!$value) {
$value = 0;
}
$value += 1;
shmop_write($resourceId, $value, 0);

return $value;
}

function flushCounter($resourceId): void
{
shmop_delete($resourceId);
shmop_close($resourceId);
}


listenEvent($taskExecutor);

 

View on GitHub

Comments

Popular posts from this blog

Azure Blob Storage. Overview

Redis as cache for excel generation using Phpspreadsheet

How long Apache tooks to process a request?