You've already forked ugly-queue
Lots of cleanup, removing of dumb code. More to do.
This commit is contained in:
@@ -5,80 +5,82 @@ use DCarbone\Helpers\FileHelper;
|
||||
/**
|
||||
* Class UglyQueue
|
||||
* @package DCarbone
|
||||
*
|
||||
* @property string name
|
||||
* @property string path
|
||||
* @property bool locked
|
||||
*/
|
||||
class UglyQueue implements \Serializable, \SplSubject, \Countable
|
||||
{
|
||||
/** @var int */
|
||||
public $notifyStatus;
|
||||
|
||||
const QUEUE_READONLY = 0;
|
||||
const QUEUE_READWRITE = 1;
|
||||
|
||||
/** @var array */
|
||||
private $observers = array();
|
||||
/** @var int */
|
||||
private $_notifyStatus;
|
||||
|
||||
/** @var \SplObserver[] */
|
||||
private $_observers = array();
|
||||
|
||||
/** @var int */
|
||||
protected $mode = null;
|
||||
|
||||
/** @var string */
|
||||
protected $_name;
|
||||
protected $baseDir;
|
||||
|
||||
/** @var string */
|
||||
protected $_path;
|
||||
protected $name;
|
||||
|
||||
/** @var string */
|
||||
protected $path;
|
||||
|
||||
/** @var bool */
|
||||
protected $_locked = false;
|
||||
protected $locked = false;
|
||||
|
||||
/** @var resource */
|
||||
protected $_tmpHandle;
|
||||
protected $tmpHandle;
|
||||
|
||||
/** @var string */
|
||||
protected $queueFile;
|
||||
|
||||
/** @var string */
|
||||
protected $queueTmpFile;
|
||||
|
||||
/** @var string */
|
||||
protected $lockFile;
|
||||
|
||||
/**
|
||||
* @param string $directoryPath
|
||||
* @param array $observers
|
||||
* @throws \RuntimeException
|
||||
* @throws \InvalidArgumentException
|
||||
* @return UglyQueue
|
||||
* @param string $baseDir
|
||||
* @param string $queueName
|
||||
* @param \SplObserver[] $observers
|
||||
*/
|
||||
public static function queueWithDirectoryPathAndObservers($directoryPath, array $observers = array())
|
||||
public function __construct($baseDir, $queueName, array $observers = array())
|
||||
{
|
||||
if (!is_string($directoryPath))
|
||||
throw new \InvalidArgumentException('Argument 1 expected to be string, '.gettype($directoryPath).' seen');
|
||||
$this->baseDir = trim($baseDir, "/\\");
|
||||
$this->name = $queueName;
|
||||
$this->_observers = $observers;
|
||||
|
||||
if (($directoryPath = trim($directoryPath)) === '')
|
||||
throw new \InvalidArgumentException('Empty string passed for argument 1');
|
||||
$path = sprintf('%s/%s', $baseDir, $queueName);
|
||||
if (!file_exists($path) && !@mkdir($path))
|
||||
throw new \RuntimeException('Unable to initialize queue directory "'.$path.'". Please check permissions.');
|
||||
|
||||
if (file_exists($directoryPath))
|
||||
$this->path = $path;
|
||||
$this->lockFile = sprintf('%s/queue.lock', $this->path);
|
||||
$this->queueFile = sprintf('%s/queue.txt', $this->path);
|
||||
$this->queueTmpFile = sprintf('%s/queue.tmp', $this->path);
|
||||
|
||||
$this->_initialize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize queue if needed.
|
||||
*/
|
||||
private function _initialize()
|
||||
{
|
||||
if (is_readable($this->path) && is_writable($this->path))
|
||||
$this->mode = self::QUEUE_READWRITE;
|
||||
else if (is_readable($this->path))
|
||||
$this->mode = self::QUEUE_READONLY;
|
||||
|
||||
if (!file_exists($this->path.'index.html'))
|
||||
{
|
||||
if (!is_dir($directoryPath))
|
||||
throw new \RuntimeException('Argument 1 expected to be path to directory, path to non-directory seen');
|
||||
}
|
||||
else if (!@mkdir($directoryPath))
|
||||
{
|
||||
throw new \RuntimeException('Unable to create queue directory at path: "'.$directoryPath.'".');
|
||||
}
|
||||
|
||||
$uglyQueue = new UglyQueue();
|
||||
$uglyQueue->observers = $observers;
|
||||
|
||||
$split = preg_split('#[/\\\]+#', $directoryPath);
|
||||
|
||||
$uglyQueue->_name = end($split);
|
||||
$uglyQueue->_path = rtrim(realpath(implode(DIRECTORY_SEPARATOR, $split)), "/\\").DIRECTORY_SEPARATOR;
|
||||
|
||||
if (is_writable($uglyQueue->_path))
|
||||
$uglyQueue->mode = self::QUEUE_READWRITE;
|
||||
else if (is_readable($uglyQueue->_path))
|
||||
$uglyQueue->mode = self::QUEUE_READONLY;
|
||||
|
||||
// Insert "don't look here" index.html file
|
||||
if (!file_exists($uglyQueue->_path.'index.html'))
|
||||
{
|
||||
if ($uglyQueue->mode === self::QUEUE_READONLY)
|
||||
throw new \RuntimeException('Cannot initialize queue with name "'.$uglyQueue->_name.'", the user lacks permission to create files.');
|
||||
if ($this->mode === self::QUEUE_READONLY)
|
||||
throw new \RuntimeException('Cannot initialize queue with name "'.$this->name.'", the user lacks permission to create files.');
|
||||
|
||||
$html = <<<HTML
|
||||
<html>
|
||||
@@ -90,44 +92,19 @@ class UglyQueue implements \Serializable, \SplSubject, \Countable
|
||||
</body>
|
||||
</html>
|
||||
HTML;
|
||||
file_put_contents($uglyQueue->_path.'index.html', $html);
|
||||
file_put_contents($this->path.'index.html', $html);
|
||||
}
|
||||
|
||||
if (!file_exists($uglyQueue->_path.'queue.txt'))
|
||||
if (!file_exists($this->queueFile))
|
||||
{
|
||||
if ($uglyQueue->mode === self::QUEUE_READONLY)
|
||||
throw new \RuntimeException('Cannot initialize queue with name "'.$uglyQueue->_name.'", the user lacks permission to create files.');
|
||||
if ($this->mode === self::QUEUE_READONLY)
|
||||
throw new \RuntimeException('Cannot initialize queue with name "'.$this->name.'", the user lacks permission to create files.');
|
||||
|
||||
file_put_contents($uglyQueue->_path.'queue.txt', '');
|
||||
file_put_contents($this->queueFile, '');
|
||||
}
|
||||
|
||||
$uglyQueue->notifyStatus = UglyQueueEnum::QUEUE_INITIALIZED;
|
||||
$uglyQueue->notify();
|
||||
|
||||
return $uglyQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $param
|
||||
* @return string
|
||||
* @throws \OutOfBoundsException
|
||||
*/
|
||||
public function __get($param)
|
||||
{
|
||||
switch($param)
|
||||
{
|
||||
case 'name' :
|
||||
return $this->_name;
|
||||
|
||||
case 'path':
|
||||
return $this->_path;
|
||||
|
||||
case 'locked':
|
||||
return $this->_locked;
|
||||
|
||||
default:
|
||||
throw new \OutOfBoundsException(get_class($this).' does not have a property named "'.$param.'".');
|
||||
}
|
||||
$this->_notifyStatus = UglyQueueEnum::QUEUE_INITIALIZED;
|
||||
$this->notify();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -137,7 +114,63 @@ HTML;
|
||||
{
|
||||
$this->_populateQueue();
|
||||
$this->unlock();
|
||||
file_put_contents($this->_path.UglyQueueManager::UGLY_QUEUE_SERIALIZED_NAME, serialize($this));
|
||||
file_put_contents($this->path.'/ugly-queue.obj', serialize($this));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getName()
|
||||
{
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function getMode()
|
||||
{
|
||||
return $this->mode;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getPath()
|
||||
{
|
||||
return $this->path;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getBaseDir()
|
||||
{
|
||||
return $this->baseDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getQueueFile()
|
||||
{
|
||||
return $this->queueFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getQueueTmpFile()
|
||||
{
|
||||
return $this->queueTmpFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getLockFile()
|
||||
{
|
||||
return $this->lockFile;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -150,8 +183,8 @@ HTML;
|
||||
if (!is_int($ttl))
|
||||
throw new \InvalidArgumentException('Argument 1 expected to be integer, "'.gettype($ttl).'" seen');
|
||||
|
||||
if ($ttl < 0)
|
||||
throw new \InvalidArgumentException('Argument 1 expected to be positive integer, "'.$ttl.'" seen');
|
||||
if ($ttl < 1)
|
||||
throw new \InvalidArgumentException('Argument 1 expected to be greater than 0 "'.$ttl.'" seen');
|
||||
|
||||
$alreadyLocked = $this->isLocked();
|
||||
|
||||
@@ -160,8 +193,8 @@ HTML;
|
||||
return $this->createLockFile($ttl);
|
||||
|
||||
// If we make it this far, there is already a lock in place.
|
||||
$this->_locked = false;
|
||||
$this->notifyStatus = UglyQueueEnum::QUEUE_LOCKED_BY_OTHER_PROCESS;
|
||||
$this->locked = false;
|
||||
$this->_notifyStatus = UglyQueueEnum::QUEUE_LOCKED_BY_OTHER_PROCESS;
|
||||
$this->notify();
|
||||
|
||||
return false;
|
||||
@@ -174,19 +207,19 @@ HTML;
|
||||
protected function createLockFile($ttl)
|
||||
{
|
||||
$ok = (bool)@file_put_contents(
|
||||
$this->_path.'queue.lock',
|
||||
$this->lockFile,
|
||||
json_encode(array('ttl' => $ttl, 'born' => time())));
|
||||
|
||||
if ($ok !== true)
|
||||
{
|
||||
$this->notifyStatus = UglyQueueEnum::QUEUE_FAILED_TO_LOCK;
|
||||
$this->_notifyStatus = UglyQueueEnum::QUEUE_FAILED_TO_LOCK;
|
||||
$this->notify();
|
||||
return $this->_locked = false;
|
||||
return $this->locked = false;
|
||||
}
|
||||
|
||||
$this->_locked = true;
|
||||
$this->locked = true;
|
||||
|
||||
$this->notifyStatus = UglyQueueEnum::QUEUE_LOCKED;
|
||||
$this->_notifyStatus = UglyQueueEnum::QUEUE_LOCKED;
|
||||
$this->notify();
|
||||
|
||||
return true;
|
||||
@@ -197,12 +230,12 @@ HTML;
|
||||
*/
|
||||
public function unlock()
|
||||
{
|
||||
if ($this->_locked === true)
|
||||
if ($this->locked === true)
|
||||
{
|
||||
unlink($this->_path.'queue.lock');
|
||||
$this->_locked = false;
|
||||
unlink($this->lockFile);
|
||||
$this->locked = false;
|
||||
|
||||
$this->notifyStatus = UglyQueueEnum::QUEUE_UNLOCKED;
|
||||
$this->_notifyStatus = UglyQueueEnum::QUEUE_UNLOCKED;
|
||||
$this->notify();
|
||||
}
|
||||
}
|
||||
@@ -214,24 +247,23 @@ HTML;
|
||||
public function isLocked()
|
||||
{
|
||||
// First check for lock file
|
||||
if (is_file($this->_path.'queue.lock'))
|
||||
if (is_file($this->lockFile))
|
||||
{
|
||||
$lock = json_decode(file_get_contents($this->_path.'queue.lock'), true);
|
||||
$lock = json_decode(file_get_contents($this->lockFile), true);
|
||||
|
||||
// If we have an invalid lock structure.
|
||||
if (!isset($lock['ttl']) || !isset($lock['born']))
|
||||
throw new \RuntimeException('Invalid "queue.lock" file structure seen at "'.$this->_path.'queue.lock".');
|
||||
// If the decoded lock file contains a ttl and born value...
|
||||
if (isset($lock['ttl']) && isset($lock['born']))
|
||||
{
|
||||
$lock_ttl = ((int)$lock['born'] + (int)$lock['ttl']);
|
||||
|
||||
// Otherwise...
|
||||
$lock_ttl = ((int)$lock['born'] + (int)$lock['ttl']);
|
||||
|
||||
// If we're within the TTL of the lock, assume another thread is already processing.
|
||||
// We'll pick it up on the next go around.
|
||||
if ($lock_ttl > time())
|
||||
return true;
|
||||
// If we're within the TTL of the lock, assume another thread is already processing.
|
||||
// We'll pick it up on the next go around.
|
||||
if ($lock_ttl > time())
|
||||
return true;
|
||||
}
|
||||
|
||||
// Else, remove lock file and assume we're good to go!
|
||||
unlink($this->_path.'queue.lock');
|
||||
unlink($this->lockFile);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -245,14 +277,14 @@ HTML;
|
||||
* @throws \InvalidArgumentException
|
||||
* @return bool|array
|
||||
*/
|
||||
public function processQueue($count = 1)
|
||||
public function retrieveItems($count = 1)
|
||||
{
|
||||
if ($this->mode === self::QUEUE_READONLY)
|
||||
throw new \RuntimeException('Queue "'.$this->_name.'" cannot be processed. It was started in Read-Only mode (the user running this process does not have permission to write to the queue directory).');
|
||||
throw new \RuntimeException('Queue "'.$this->name.'" cannot be processed. It was started in Read-Only mode (the user running this process does not have permission to write to the queue directory).');
|
||||
|
||||
// If we don't have a lock, assume issue and move on.
|
||||
if ($this->_locked === false)
|
||||
throw new \RuntimeException('Cannot process queue named "'.$this->_name.'". It is locked by another process.');
|
||||
if ($this->isLocked() === false)
|
||||
throw new \RuntimeException('Cannot process queue named "'.$this->name.'". It is locked by another process.');
|
||||
|
||||
// If non-int valid is passed
|
||||
if (!is_int($count))
|
||||
@@ -262,21 +294,21 @@ HTML;
|
||||
if ($count <= 0)
|
||||
throw new \InvalidArgumentException('Argument 1 expected to be integer greater than 0, "'.$count.'" seen');
|
||||
|
||||
if ($this->notifyStatus !== UglyQueueEnum::QUEUE_PROCESSING)
|
||||
if ($this->_notifyStatus !== UglyQueueEnum::QUEUE_PROCESSING)
|
||||
{
|
||||
$this->notifyStatus = UglyQueueEnum::QUEUE_PROCESSING;
|
||||
$this->_notifyStatus = UglyQueueEnum::QUEUE_PROCESSING;
|
||||
$this->notify();
|
||||
}
|
||||
|
||||
// Find number of lines in the queue file
|
||||
$lineCount = FileHelper::getLineCount($this->_path.'queue.txt');
|
||||
$lineCount = FileHelper::getLineCount($this->queueFile);
|
||||
|
||||
// If queue line count is 0, assume empty
|
||||
if ($lineCount === 0)
|
||||
return false;
|
||||
|
||||
// Try to open the file for reading / writing.
|
||||
$queueFileHandle = fopen($this->_path.'queue.txt', 'r+');
|
||||
$queueFileHandle = fopen($this->queueFile, 'r+');
|
||||
if ($queueFileHandle === false)
|
||||
$this->unlock();
|
||||
|
||||
@@ -300,13 +332,13 @@ HTML;
|
||||
ftruncate($queueFileHandle, 0);
|
||||
fclose($queueFileHandle);
|
||||
|
||||
$this->notifyStatus = UglyQueueEnum::QUEUE_REACHED_END;
|
||||
$this->_notifyStatus = UglyQueueEnum::QUEUE_REACHED_END;
|
||||
$this->notify();
|
||||
}
|
||||
// Otherwise, create new queue file minus the processed lines.
|
||||
else
|
||||
{
|
||||
$tmp = fopen($this->_path.'queue.tmp', 'w+');
|
||||
$tmp = fopen($this->queueTmpFile, 'w+');
|
||||
rewind($queueFileHandle);
|
||||
$i = 0;
|
||||
while (($line = fgets($queueFileHandle)) !== false && $i < $start_line)
|
||||
@@ -319,8 +351,8 @@ HTML;
|
||||
|
||||
fclose($queueFileHandle);
|
||||
fclose($tmp);
|
||||
unlink($this->_path.'queue.txt');
|
||||
rename($this->_path.'queue.tmp', $this->_path.'queue.txt');
|
||||
unlink($this->queueFile);
|
||||
rename($this->queueTmpFile, $this->queueFile);
|
||||
}
|
||||
|
||||
return $data;
|
||||
@@ -332,19 +364,19 @@ HTML;
|
||||
* @return bool
|
||||
* @throws \RuntimeException
|
||||
*/
|
||||
public function addToQueue($key, $value)
|
||||
public function addItem($key, $value)
|
||||
{
|
||||
if ($this->mode === self::QUEUE_READONLY)
|
||||
throw new \RuntimeException('Cannot add items to queue "'.$this->_name.'" as it is in read-only mode');
|
||||
throw new \RuntimeException('Cannot add item to queue "'.$this->name.'" as it is in read-only mode');
|
||||
|
||||
// If we don't have a lock, assume issue and move on.
|
||||
if ($this->_locked === false)
|
||||
throw new \RuntimeException('Cannot add items to queue "'.$this->_name.'". Queue is already locked by another process');
|
||||
if ($this->locked === false)
|
||||
throw new \RuntimeException('Cannot add item to queue "'.$this->name.'". Queue is already locked by another process');
|
||||
|
||||
if (!is_resource($this->_tmpHandle))
|
||||
if (!is_resource($this->tmpHandle))
|
||||
{
|
||||
$this->_tmpHandle = fopen($this->_path.'queue.tmp', 'w+');
|
||||
if ($this->_tmpHandle === false)
|
||||
$this->tmpHandle = fopen($this->queueTmpFile, 'w+');
|
||||
if ($this->tmpHandle === false)
|
||||
throw new \RuntimeException('Unable to create "queue.tmp" file.');
|
||||
}
|
||||
|
||||
@@ -352,7 +384,7 @@ HTML;
|
||||
$value = json_encode($value);
|
||||
|
||||
return (bool)fwrite(
|
||||
$this->_tmpHandle,
|
||||
$this->tmpHandle,
|
||||
$key."\t".str_replace(array("\r\n", "\n"), ' ', $value)
|
||||
."\n");
|
||||
}
|
||||
@@ -364,35 +396,26 @@ HTML;
|
||||
*/
|
||||
public function _populateQueue()
|
||||
{
|
||||
if (is_resource($this->_tmpHandle))
|
||||
if (is_resource($this->tmpHandle))
|
||||
{
|
||||
if (file_exists($this->_path.'queue.txt'))
|
||||
if (file_exists($this->queueFile))
|
||||
{
|
||||
$queueFileHandle = fopen($this->_path.'queue.txt', 'r+');
|
||||
$queueFileHandle = fopen($this->queueFile, 'r+');
|
||||
while (($line = fgets($queueFileHandle)) !== false)
|
||||
{
|
||||
if ($line !== "\n" && $line !== "")
|
||||
fwrite($this->_tmpHandle, $line);
|
||||
fwrite($this->tmpHandle, $line);
|
||||
}
|
||||
|
||||
fclose($queueFileHandle);
|
||||
unlink($this->_path.'queue.txt');
|
||||
unlink($this->queueFile);
|
||||
}
|
||||
|
||||
fclose($this->_tmpHandle);
|
||||
rename($this->_path.'queue.tmp', $this->_path.'queue.txt');
|
||||
fclose($this->tmpHandle);
|
||||
rename($this->queueTmpFile, $this->queueFile);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
* @throws \RuntimeException
|
||||
*/
|
||||
public function getQueueItemCount()
|
||||
{
|
||||
return FileHelper::getLineCount($this->_path.'queue.txt');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @return bool
|
||||
@@ -403,13 +426,11 @@ HTML;
|
||||
$key = (string)$key;
|
||||
|
||||
// Try to open the file for reading / writing.
|
||||
$queueFileHandle = fopen($this->_path.'queue.txt', 'r');
|
||||
$queueFileHandle = fopen($this->queueFile, 'r');
|
||||
|
||||
while(($line = fscanf($queueFileHandle, "%s\t%s\n")) !== false)
|
||||
{
|
||||
list ($lineKey, $lineValue) = $line;
|
||||
|
||||
if ($key === $lineKey)
|
||||
if ($key === $line[0])
|
||||
{
|
||||
fclose($queueFileHandle);
|
||||
return true;
|
||||
@@ -429,7 +450,7 @@ HTML;
|
||||
*/
|
||||
public function count()
|
||||
{
|
||||
return $this->getQueueItemCount();
|
||||
return (int)FileHelper::getLineCount($this->queueFile);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -441,7 +462,7 @@ HTML;
|
||||
*/
|
||||
public function serialize()
|
||||
{
|
||||
return serialize(array($this->_name, $this->_path));
|
||||
return serialize(array($this->name, $this->path));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -456,8 +477,8 @@ HTML;
|
||||
{
|
||||
/** @var \DCarbone\UglyQueue $uglyQueue */
|
||||
$data = unserialize($serialized);
|
||||
$this->_name = $data[0];
|
||||
$this->_path = $data[1];
|
||||
$this->name = $data[0];
|
||||
$this->path = $data[1];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -470,8 +491,8 @@ HTML;
|
||||
*/
|
||||
public function attach(\SplObserver $observer)
|
||||
{
|
||||
if (!in_array($observer, $this->observers))
|
||||
$this->observers[] = $observer;
|
||||
if (!in_array($observer, $this->_observers))
|
||||
$this->_observers[] = $observer;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -484,9 +505,9 @@ HTML;
|
||||
*/
|
||||
public function detach(\SplObserver $observer)
|
||||
{
|
||||
$idx = array_search($observer, $this->observers, true);
|
||||
$idx = array_search($observer, $this->_observers, true);
|
||||
if ($idx !== false)
|
||||
unset($this->observers[$idx]);
|
||||
unset($this->_observers[$idx]);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -498,9 +519,9 @@ HTML;
|
||||
*/
|
||||
public function notify()
|
||||
{
|
||||
for ($i = 0, $count = count($this->observers); $i < $count; $i++)
|
||||
foreach($this->_observers as $observer)
|
||||
{
|
||||
$this->observers[$i]->notify($this);
|
||||
$observer->update($this);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user