Added QueueManager to process background jobs

This commit is contained in:
Frederic Guillot 2016-05-23 20:43:51 -04:00
parent dc6176fca2
commit 8314c99b56
29 changed files with 427 additions and 34 deletions

View File

@ -0,0 +1,28 @@
<?php
namespace Kanboard\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
/**
* Class WorkerCommand
*
* @package Kanboard\Console
* @author Frederic Guillot
*/
class WorkerCommand extends BaseCommand
{
protected function configure()
{
$this
->setName('worker')
->setDescription('Execute queue worker')
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->queueManager->listen();
}
}

View File

@ -27,6 +27,7 @@ use Pimple\Container;
* @property \Kanboard\Core\Http\Response $response
* @property \Kanboard\Core\Http\Router $router
* @property \Kanboard\Core\Http\Route $route
* @property \Kanboard\Core\Queue\QueueManager $queueManager
* @property \Kanboard\Core\Mail\Client $emailClient
* @property \Kanboard\Core\ObjectStorage\ObjectStorageInterface $objectStorage
* @property \Kanboard\Core\Plugin\Hook $hook

View File

@ -2,6 +2,7 @@
namespace Kanboard\Core\Mail;
use Kanboard\Job\EmailJob;
use Pimple\Container;
use Kanboard\Core\Base;
@ -46,25 +47,31 @@ class Client extends Base
public function send($email, $name, $subject, $html)
{
if (! empty($email)) {
$this->logger->debug('Sending email to '.$email.' ('.MAIL_TRANSPORT.')');
$start_time = microtime(true);
$author = 'Kanboard';
if ($this->userSession->isLogged()) {
$author = e('%s via Kanboard', $this->helper->user->getFullname());
}
$this->getTransport(MAIL_TRANSPORT)->sendEmail($email, $name, $subject, $html, $author);
if (DEBUG) {
$this->logger->debug('Email sent in '.round(microtime(true) - $start_time, 6).' seconds');
}
$this->queueManager->push(EmailJob::getInstance($this->container)
->withParams($email, $name, $subject, $html, $this->getAuthor())
);
}
return $this;
}
/**
* Get email author
*
* @access public
* @return string
*/
public function getAuthor()
{
$author = 'Kanboard';
if ($this->userSession->isLogged()) {
$author = e('%s via Kanboard', $this->helper->user->getFullname());
}
return $author;
}
/**
* Get mail transport instance
*

View File

@ -0,0 +1,50 @@
<?php
namespace Kanboard\Core\Queue;
use Kanboard\Core\Base;
use Kanboard\Job\BaseJob;
use SimpleQueue\Job;
/**
* Class JobHandler
*
* @package Kanboard\Core\Queue
* @author Frederic Guillot
*/
class JobHandler extends Base
{
/**
* Serialize a job
*
* @access public
* @param BaseJob $job
* @return Job
*/
public function serializeJob(BaseJob $job)
{
return new Job(array(
'class' => get_class($job),
'params' => $job->getJobParams(),
));
}
/**
* Execute a job
*
* @access public
* @param Job $job
*/
public function executeJob(Job $job)
{
$payload = $job->getBody();
$className = $payload['class'];
if (DEBUG) {
$this->logger->debug(__METHOD__.' Received job => '.$className);
}
$worker = new $className($this->container);
call_user_func_array(array($worker, 'execute'), $payload['params']);
}
}

View File

@ -0,0 +1,71 @@
<?php
namespace Kanboard\Core\Queue;
use Kanboard\Core\Base;
use Kanboard\Job\BaseJob;
use LogicException;
use SimpleQueue\Queue;
/**
* Class QueueManager
*
* @package Kanboard\Core\Queue
* @author Frederic Guillot
*/
class QueueManager extends Base
{
/**
* @var Queue
*/
protected $queue = null;
/**
* Set queue driver
*
* @access public
* @param Queue $queue
* @return $this
*/
public function setQueue(Queue $queue)
{
$this->queue = $queue;
return $this;
}
/**
* Send a new job to the queue
*
* @access public
* @param BaseJob $job
* @return $this
*/
public function push(BaseJob $job)
{
if ($this->queue !== null) {
$this->queue->push(JobHandler::getInstance($this->container)->serializeJob($job));
} else {
call_user_func_array(array($job, 'execute'), $job->getJobParams());
}
return $this;
}
/**
* Wait for new jobs
*
* @access public
* @throws LogicException
*/
public function listen()
{
if ($this->queue === null) {
throw new LogicException('No Queue Driver defined!');
}
while ($job = $this->queue->pull()) {
JobHandler::getInstance($this->container)->executeJob($job);
$this->queue->completed($job);
}
}
}

33
app/Job/BaseJob.php Normal file
View File

@ -0,0 +1,33 @@
<?php
namespace Kanboard\Job;
use Kanboard\Core\Base;
/**
* Class BaseJob
*
* @package Kanboard\Job
* @author Frederic Guillot
*/
abstract class BaseJob extends Base
{
/**
* Job parameters
*
* @access protected
* @var array
*/
protected $jobParams = array();
/**
* Get job parameters
*
* @access public
* @return array
*/
public function getJobParams()
{
return $this->jobParams;
}
}

54
app/Job/EmailJob.php Normal file
View File

@ -0,0 +1,54 @@
<?php
namespace Kanboard\Job;
/**
* Class EmailJob
*
* @package Kanboard\Job
* @author Frederic Guillot
*/
class EmailJob extends BaseJob
{
/**
* Set job parameters
*
* @access public
* @param string $email
* @param string $name
* @param string $subject
* @param string $html
* @param string $author
* @return $this
*/
public function withParams($email, $name, $subject, $html, $author)
{
$this->jobParams = array($email, $name, $subject, $html, $author);
return $this;
}
/**
* Execute job
*
* @access public
* @param string $email
* @param string $name
* @param string $subject
* @param string $html
* @param string $author
*/
public function execute($email, $name, $subject, $html, $author)
{
$this->logger->debug(__METHOD__.' Sending email to '.$email.' via '.MAIL_TRANSPORT);
$startTime = microtime(true);
$this->emailClient
->getTransport(MAIL_TRANSPORT)
->sendEmail($email, $name, $subject, $html, $author)
;
if (DEBUG) {
$this->logger->debug('Email sent in '.round(microtime(true) - $startTime, 6).' seconds');
}
}
}

View File

@ -0,0 +1,40 @@
<?php
namespace Kanboard\Job;
/**
* Class ProjectMetricJob
*
* @package Kanboard\Job
* @author Frederic Guillot
*/
class ProjectMetricJob extends BaseJob
{
/**
* Set job parameters
*
* @access public
* @param integer $projectId
* @return $this
*/
public function withParams($projectId)
{
$this->jobParams = array($projectId);
return $this;
}
/**
* Execute job
*
* @access public
* @param integer $projectId
*/
public function execute($projectId)
{
$this->logger->debug(__METHOD__.' Run project metrics calculation');
$now = date('Y-m-d');
$this->projectDailyColumnStats->updateTotals($projectId, $now);
$this->projectDailyStats->updateTotals($projectId, $now);
}
}

View File

@ -36,7 +36,7 @@ use Kanboard\Action\TaskCloseNoActivity;
/**
* Action Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class ActionProvider implements ServiceProviderInterface

View File

@ -17,7 +17,7 @@ use Kanboard\Auth\ReverseProxyAuth;
/**
* Authentication Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class AuthenticationProvider implements ServiceProviderInterface

View File

@ -12,7 +12,7 @@ use Kanboard\User\Avatar\LetterAvatarProvider;
/**
* Avatar Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class AvatarProvider implements ServiceProviderInterface

View File

@ -11,6 +11,12 @@ use Kanboard\Core\Http\OAuth2;
use Kanboard\Core\Tool;
use Kanboard\Core\Http\Client as HttpClient;
/**
* Class ClassProvider
*
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class ClassProvider implements ServiceProviderInterface
{
private $classes = array(

View File

@ -8,6 +8,12 @@ use Pimple\Container;
use Pimple\ServiceProviderInterface;
use PicoDb\Database;
/**
* Class DatabaseProvider
*
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class DatabaseProvider implements ServiceProviderInterface
{
/**

View File

@ -15,6 +15,12 @@ use Kanboard\Subscriber\SubtaskTimeTrackingSubscriber;
use Kanboard\Subscriber\TransitionSubscriber;
use Kanboard\Subscriber\RecurringTaskSubscriber;
/**
* Class EventDispatcherProvider
*
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class EventDispatcherProvider implements ServiceProviderInterface
{
public function register(Container $container)

View File

@ -12,7 +12,7 @@ use Kanboard\ExternalLink\FileLinkProvider;
/**
* External Link Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class ExternalLinkProvider implements ServiceProviderInterface

View File

@ -37,7 +37,7 @@ use Pimple\ServiceProviderInterface;
/**
* Filter Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class FilterProvider implements ServiceProviderInterface

View File

@ -11,7 +11,7 @@ use Kanboard\Group\LdapBackendGroupProvider;
/**
* Group Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class GroupProvider implements ServiceProviderInterface

View File

@ -7,6 +7,12 @@ use Kanboard\Core\Template;
use Pimple\Container;
use Pimple\ServiceProviderInterface;
/**
* Class HelperProvider
*
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class HelperProvider implements ServiceProviderInterface
{
public function register(Container $container)

View File

@ -11,6 +11,12 @@ use SimpleLogger\Stdout;
use SimpleLogger\Syslog;
use SimpleLogger\File;
/**
* Class LoggingProvider
*
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class LoggingProvider implements ServiceProviderInterface
{
public function register(Container $container)

View File

@ -12,7 +12,7 @@ use Kanboard\Notification\Web as WebNotification;
/**
* Notification Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class NotificationProvider implements ServiceProviderInterface

View File

@ -9,7 +9,7 @@ use Kanboard\Core\Plugin\Loader;
/**
* Plugin Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class PluginProvider implements ServiceProviderInterface

View File

@ -0,0 +1,27 @@
<?php
namespace Kanboard\ServiceProvider;
use Kanboard\Core\Queue\QueueManager;
use Pimple\Container;
use Pimple\ServiceProviderInterface;
/**
* Class QueueProvider
*
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class QueueProvider implements ServiceProviderInterface
{
/**
* Registers services on the given container.
*
* @param Container $container
*/
public function register(Container $container)
{
$container['queueManager'] = new QueueManager($container);
return $container;
}
}

View File

@ -10,7 +10,7 @@ use Kanboard\Core\Http\Router;
/**
* Route Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class RouteProvider implements ServiceProviderInterface

View File

@ -11,7 +11,7 @@ use Kanboard\Core\Session\FlashMessage;
/**
* Session Provider
*
* @package serviceProvider
* @package Kanboard\ServiceProvider
* @author Frederic Guillot
*/
class SessionProvider implements ServiceProviderInterface

View File

@ -3,6 +3,7 @@
namespace Kanboard\Subscriber;
use Kanboard\Event\TaskEvent;
use Kanboard\Job\ProjectMetricJob;
use Kanboard\Model\Task;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
@ -23,8 +24,7 @@ class ProjectDailySummarySubscriber extends BaseSubscriber implements EventSubsc
{
if (isset($event['project_id']) && !$this->isExecuted()) {
$this->logger->debug('Subscriber executed: '.__METHOD__);
$this->projectDailyColumnStats->updateTotals($event['project_id'], date('Y-m-d'));
$this->projectDailyStats->updateTotals($event['project_id'], date('Y-m-d'));
$this->queueManager->push(ProjectMetricJob::getInstance($this->container)->withParams($event['project_id']));
}
}
}

View File

@ -45,4 +45,5 @@ $container->register(new Kanboard\ServiceProvider\ActionProvider());
$container->register(new Kanboard\ServiceProvider\ExternalLinkProvider());
$container->register(new Kanboard\ServiceProvider\AvatarProvider());
$container->register(new Kanboard\ServiceProvider\FilterProvider());
$container->register(new Kanboard\ServiceProvider\QueueProvider());
$container->register(new Kanboard\ServiceProvider\PluginProvider());

View File

@ -30,6 +30,7 @@
"fguillot/picodb" : "1.0.11",
"fguillot/simpleLogger" : "1.0.1",
"fguillot/simple-validator" : "1.0.0",
"fguillot/simple-queue" : "dev-master",
"paragonie/random_compat": "@stable",
"pimple/pimple" : "~3.0",
"ramsey/array_column": "@stable",

64
composer.lock generated
View File

@ -4,8 +4,8 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file",
"This file is @generated automatically"
],
"hash": "7679ea6537bad9f8be67619d321a72f9",
"content-hash": "ef2d3ad0af1dcad85710d537150ec151",
"hash": "0c3cc3fb800c021f6829868a36cc0f1e",
"content-hash": "02f905c6f442f47221f2b9f5b9f05766",
"packages": [
{
"name": "christian-riesen/base32",
@ -274,6 +274,53 @@
"homepage": "https://github.com/fguillot/picoDb",
"time": "2016-05-15 01:02:48"
},
{
"name": "fguillot/simple-queue",
"version": "dev-master",
"source": {
"type": "git",
"url": "https://github.com/fguillot/simple-queue.git",
"reference": "ce5a4c502feb619a7adffc0d39dfd10b00ca9623"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/fguillot/simple-queue/zipball/ce5a4c502feb619a7adffc0d39dfd10b00ca9623",
"reference": "ce5a4c502feb619a7adffc0d39dfd10b00ca9623",
"shasum": ""
},
"require": {
"php": ">=5.3.3"
},
"require-dev": {
"mariano/disque-php": "~2.0",
"pda/pheanstalk": "~3.0",
"php-amqplib/php-amqplib": "2.6.*",
"phpunit/phpunit": "5.3.*"
},
"suggest": {
"mariano/disque-php": "Required to use the Disque queue driver (~2.0).",
"pda/pheanstalk": "Required to use the Beanstalk queue driver (~3.0).",
"php-amqplib/php-amqplib": "Required to use the RabbitMQ queue driver (2.6.*)."
},
"type": "library",
"autoload": {
"psr-4": {
"SimpleQueue\\": "src/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Frédéric Guillot"
}
],
"description": "Abstraction layer for multiple queue systems",
"homepage": "https://github.com/fguillot/SimpleQueue",
"time": "2016-05-23 18:42:15"
},
{
"name": "fguillot/simple-validator",
"version": "1.0.0",
@ -747,16 +794,16 @@
},
{
"name": "symfony/polyfill-mbstring",
"version": "v1.1.1",
"version": "v1.2.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-mbstring.git",
"reference": "1289d16209491b584839022f29257ad859b8532d"
"reference": "dff51f72b0706335131b00a7f49606168c582594"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/1289d16209491b584839022f29257ad859b8532d",
"reference": "1289d16209491b584839022f29257ad859b8532d",
"url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/dff51f72b0706335131b00a7f49606168c582594",
"reference": "dff51f72b0706335131b00a7f49606168c582594",
"shasum": ""
},
"require": {
@ -768,7 +815,7 @@
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "1.1-dev"
"dev-master": "1.2-dev"
}
},
"autoload": {
@ -802,7 +849,7 @@
"portable",
"shim"
],
"time": "2016-01-20 09:13:37"
"time": "2016-05-18 14:26:46"
}
],
"packages-dev": [
@ -859,6 +906,7 @@
"aliases": [],
"minimum-stability": "stable",
"stability-flags": {
"fguillot/simple-queue": 20,
"paragonie/random_compat": 0,
"ramsey/array_column": 0
},

View File

@ -6,6 +6,7 @@ use Kanboard\Console\PluginUninstallCommand;
use Kanboard\Console\PluginUpgradeCommand;
use Kanboard\Console\ResetPasswordCommand;
use Kanboard\Console\ResetTwoFactorCommand;
use Kanboard\Console\WorkerCommand;
use Symfony\Component\Console\Application;
use Symfony\Component\EventDispatcher\Event;
use Kanboard\Console\TaskOverdueNotificationCommand;
@ -37,6 +38,7 @@ try {
$application->add(new LocaleComparatorCommand($container));
$application->add(new TaskTriggerCommand($container));
$application->add(new CronjobCommand($container));
$application->add(new WorkerCommand($container));
$application->add(new ResetPasswordCommand($container));
$application->add(new ResetTwoFactorCommand($container));
$application->add(new PluginUpgradeCommand($container));