Move custom libs to the source tree

This commit is contained in:
Frédéric Guillot
2018-04-04 15:21:13 -07:00
parent 62178b1f2b
commit a4642d17e0
42 changed files with 96 additions and 668 deletions

View File

@@ -0,0 +1,138 @@
<?php
namespace SimpleQueue\Adapter;
use DateTime;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use SimpleQueue\Job;
use SimpleQueue\QueueAdapterInterface;
/**
* Class AmqpQueueAdapter
*
* @package SimpleQueue\Adapter
*/
class AmqpQueueAdapter implements QueueAdapterInterface
{
/**
* @var AMQPChannel
*/
protected $channel;
/**
* @var string
*/
protected $exchange = '';
/**
* @var string
*/
protected $queue = '';
/**
* AmqpQueueAdapter constructor.
*
* @param AMQPChannel $channel
* @param string $queue
* @param string $exchange
*/
public function __construct(AMQPChannel $channel, $queue, $exchange)
{
$this->channel = $channel;
$this->exchange = $exchange;
$this->queue = $queue;
}
/**
* Send a job
*
* @access public
* @param Job $job
* @return $this
*/
public function push(Job $job)
{
$message = new AMQPMessage($job->serialize(), array('content_type' => 'text/plain'));
$this->channel->basic_publish($message, $this->exchange);
return $this;
}
/**
* Schedule a job in the future
*
* @access public
* @param Job $job
* @param DateTime $dateTime
* @return $this
*/
public function schedule(Job $job, DateTime $dateTime)
{
$now = new DateTime();
$when = clone($dateTime);
$delay = $when->getTimestamp() - $now->getTimestamp();
$message = new AMQPMessage($job->serialize(), array('delivery_mode' => 2));
$message->set('application_headers', new AMQPTable(array('x-delay' => $delay)));
$this->channel->basic_publish($message, $this->exchange);
return $this;
}
/**
* Wait and get job from a queue
*
* @access public
* @return Job|null
*/
public function pull()
{
$message = null;
$this->channel->basic_consume($this->queue, 'test', false, false, false, false, function ($msg) use (&$message) {
$message = $msg;
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
});
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
if ($message === null) {
return null;
}
$job = new Job();
$job->setId($message->get('delivery_tag'));
$job->unserialize($message->getBody());
return $job;
}
/**
* Acknowledge a job
*
* @access public
* @param Job $job
* @return $this
*/
public function completed(Job $job)
{
$this->channel->basic_ack($job->getId());
return $this;
}
/**
* Mark a job as failed
*
* @access public
* @param Job $job
* @return $this
*/
public function failed(Job $job)
{
$this->channel->basic_nack($job->getId());
return $this;
}
}

View File

@@ -0,0 +1,120 @@
<?php
namespace SimpleQueue\Adapter;
use DateTime;
use Pheanstalk\Job as BeanstalkJob;
use Pheanstalk\Pheanstalk;
use Pheanstalk\PheanstalkInterface;
use SimpleQueue\Job;
use SimpleQueue\QueueAdapterInterface;
/**
* Class BeanstalkQueueAdapter
*
* @package SimpleQueue\Adapter
*/
class BeanstalkQueueAdapter implements QueueAdapterInterface
{
/**
* @var PheanstalkInterface
*/
protected $beanstalk;
/**
* @var string
*/
protected $queueName = '';
/**
* BeanstalkQueueAdapter constructor.
*
* @param PheanstalkInterface $beanstalk
* @param string $queueName
*/
public function __construct(PheanstalkInterface $beanstalk, $queueName)
{
$this->beanstalk = $beanstalk;
$this->queueName = $queueName;
}
/**
* Send a job
*
* @access public
* @param Job $job
* @return $this
*/
public function push(Job $job)
{
$this->beanstalk->putInTube($this->queueName, $job->serialize());
return $this;
}
/**
* Schedule a job in the future
*
* @access public
* @param Job $job
* @param DateTime $dateTime
* @return $this
*/
public function schedule(Job $job, DateTime $dateTime)
{
$now = new DateTime();
$when = clone($dateTime);
$delay = $when->getTimestamp() - $now->getTimestamp();
$this->beanstalk->putInTube($this->queueName, $job->serialize(), Pheanstalk::DEFAULT_PRIORITY, $delay);
return $this;
}
/**
* Wait and get job from a queue
*
* @access public
* @return Job|null
*/
public function pull()
{
$beanstalkJob = $this->beanstalk->reserveFromTube($this->queueName);
if ($beanstalkJob === false) {
return null;
}
$job = new Job();
$job->setId($beanstalkJob->getId());
$job->unserialize($beanstalkJob->getData());
return $job;
}
/**
* Acknowledge a job
*
* @access public
* @param Job $job
* @return $this
*/
public function completed(Job $job)
{
$beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize());
$this->beanstalk->delete($beanstalkJob);
return $this;
}
/**
* Mark a job as failed
*
* @access public
* @param Job $job
* @return $this
*/
public function failed(Job $job)
{
$beanstalkJob = new BeanstalkJob($job->getId(), $job->serialize());
$this->beanstalk->bury($beanstalkJob);
return $this;
}
}

View File

@@ -0,0 +1,14 @@
<?php
namespace SimpleQueue\Exception;
use Exception;
/**
* Class NotSupportedException
*
* @package SimpleQueue\Exception
*/
class NotSupportedException extends Exception
{
}

98
libs/SimpleQueue/Job.php Normal file
View File

@@ -0,0 +1,98 @@
<?php
namespace SimpleQueue;
/**
* Class Job
*
* @package SimpleQueue
*/
class Job
{
protected $id;
protected $body;
/**
* Job constructor.
*
* @param null $body
* @param null $id
*/
public function __construct($body = null, $id = null)
{
$this->body = $body;
$this->id = $id;
}
/**
* Unserialize a payload
*
* @param string $payload
* @return $this
*/
public function unserialize($payload)
{
$this->body = json_decode($payload, true);
return $this;
}
/**
* Serialize the body
*
* @return string
*/
public function serialize()
{
return json_encode($this->body);
}
/**
* Set body
*
* @param mixed $body
* @return Job
*/
public function setBody($body)
{
$this->body = $body;
return $this;
}
/**
* Get body
*
* @return mixed
*/
public function getBody()
{
return $this->body;
}
/**
* Set job ID
*
* @param mixed $jobId
* @return Job
*/
public function setId($jobId)
{
$this->id = $jobId;
return $this;
}
/**
* Get job ID
* @return mixed
*/
public function getId()
{
return $this->id;
}
/**
* Execute job
*/
public function execute()
{
}
}

View File

@@ -0,0 +1,92 @@
<?php
namespace SimpleQueue;
use DateTime;
/**
* Class Queue
*
* @package SimpleQueue
*/
class Queue implements QueueAdapterInterface
{
/**
* @var QueueAdapterInterface
*/
protected $queueAdapter;
/**
* Queue constructor.
*
* @param QueueAdapterInterface $queueAdapter
*/
public function __construct(QueueAdapterInterface $queueAdapter)
{
$this->queueAdapter = $queueAdapter;
}
/**
* Send a job
*
* @access public
* @param Job $job
* @return $this
*/
public function push(Job $job)
{
$this->queueAdapter->push($job);
return $this;
}
/**
* Schedule a job in the future
*
* @access public
* @param Job $job
* @param DateTime $dateTime
* @return $this
*/
public function schedule(Job $job, DateTime $dateTime)
{
$this->queueAdapter->schedule($job, $dateTime);
return $this;
}
/**
* Wait and get job from a queue
*
* @access public
* @return Job|null
*/
public function pull()
{
return $this->queueAdapter->pull();
}
/**
* Acknowledge a job
*
* @access public
* @param Job $job
* @return $this
*/
public function completed(Job $job)
{
$this->queueAdapter->completed($job);
return $this;
}
/**
* Mark a job as failed
*
* @access public
* @param Job $job
* @return $this
*/
public function failed(Job $job)
{
$this->queueAdapter->failed($job);
return $this;
}
}

View File

@@ -0,0 +1,58 @@
<?php
namespace SimpleQueue;
use DateTime;
/**
* Interface AdapterInterface
*
* @package SimpleQueue\Adapter
*/
interface QueueAdapterInterface
{
/**
* Send a job
*
* @access public
* @param Job $job
* @return $this
*/
public function push(Job $job);
/**
* Schedule a job in the future
*
* @access public
* @param Job $job
* @param DateTime $dateTime
* @return $this
*/
public function schedule(Job $job, DateTime $dateTime);
/**
* Wait and get job from a queue
*
* @access public
* @return Job|null
*/
public function pull();
/**
* Acknowledge a job
*
* @access public
* @param Job $job
* @return $this
*/
public function completed(Job $job);
/**
* Mark a job as failed
*
* @access public
* @param Job $job
* @return $this
*/
public function failed(Job $job);
}