From 89c51468ebc7dac69bbb99f6b2a6b163a4c27a28 Mon Sep 17 00:00:00 2001 From: Daniel Carbone Date: Mon, 29 Sep 2014 16:26:53 -0500 Subject: [PATCH] Huge update. --- src/UglyQueue.php | 495 +++++++++++++++++++++++---------------- src/UglyQueueManager.php | 234 ++++++++++++++++++ 2 files changed, 526 insertions(+), 203 deletions(-) create mode 100644 src/UglyQueueManager.php diff --git a/src/UglyQueue.php b/src/UglyQueue.php index fbd0333..46fef70 100644 --- a/src/UglyQueue.php +++ b/src/UglyQueue.php @@ -5,159 +5,89 @@ use DCarbone\Helpers\FileHelper; /** * Class UglyQueue * @package DCarbone + * + * @property string name + * @property string path + * @property bool locked */ -class UglyQueue +class UglyQueue implements \Serializable, \SplSubject { + const NOTIFY_QUEUE_INITIALIZED = 0; + const NOTIFY_QUEUE_LOCKED = 1; + const NOTIFY_QUEUE_FAILED_TO_LOCK = 2; + const NOTIFY_QUEUE_LOCKED_BY_OTHER_PROCESS = 3; + const NOTIFY_QUEUE_UNLOCKED = 4; + const NOTIFY_QUEUE_PROCESSING = 5; + const NOTIFY_QUEUE_REACHED_END = 6; + + /** @var int */ + public $notifyStatus; + + const QUEUE_READONLY = 0; + const QUEUE_READWRITE = 1; + /** @var array */ - protected $config; + private $observers = array(); + + /** @var int */ + protected $mode = null; /** @var string */ - protected $queueBaseDir; + protected $_name; /** @var string */ - protected $queueGroup = null; - - /** @var string */ - protected $queueGroupDirPath = null; + protected $_path; /** @var bool */ - protected $haveLock = false; - - /** @var bool */ - protected $init = false; + protected $_locked = false; /** @var resource */ protected $_tmpHandle; /** - * @param array $config + * @param string $directoryPath + * @param array $observers * @throws \RuntimeException * @throws \InvalidArgumentException + * @return UglyQueue */ - public function __construct(array $config) + public static function queueWithGroupDirectoryPathAndObservers($directoryPath, array $observers = array()) { - if (!isset($config['queue-base-dir'])) - throw new \InvalidArgumentException('UglyQueue::__construct - "$config" parameter "queue-base-dir" not seen.'); + if (!is_string($directoryPath)) + throw new \InvalidArgumentException('Argument 1 expected to be string, '.gettype($directoryPath).' seen'); - if (!is_dir($config['queue-base-dir']) || !is_writable($config['queue-base-dir'])) - throw new \RuntimeException('UglyQueue::__construct - "$config[\'queue-base-dir\']" points to a directory that either doesn\'t exist or is not writable'); + if (($directoryPath = trim($directoryPath)) === '') + throw new \InvalidArgumentException('Empty string passed for argument 1'); - $this->config = $config; - - $this->queueBaseDir = $this->config['queue-base-dir']; - } - - /** - * Destructor - */ - public function __destruct() - { - $this->unlock(); - $this->_populateQueue(); - } - - /** - * @param int $ttl Time to live in seconds - * @throws \InvalidArgumentException - * @return bool - */ - public function lock($ttl = 250) - { - if (!is_int($ttl)) - throw new \InvalidArgumentException('UglyQueue::lock - Argument 1 expected to be positive integer, "'.gettype($ttl).'" seen'); - - if ($ttl < 0) - throw new \InvalidArgumentException('UglyQueue::lock - Argument 1 expected to be positive integer, "'.$ttl.'" seen'); - - $already_locked = $this->isLocked(); - - // If there is no lock, currently - if ($already_locked === false) - return $this->haveLock = $this->createQueueLock($ttl); - - // If we make it this far, there is already a lock in place. - return $this->haveLock = false; - } - - /** - * @param int $ttl seconds to live - * @return bool - */ - protected function createQueueLock($ttl) - { - $ok = (bool)@file_put_contents( - $this->queueGroupDirPath.'queue.lock', - json_encode(array('ttl' => $ttl, 'born' => time()))); - - if ($ok !== true) - return false; - - $this->haveLock = true; - return true; - } - - /** - * Close this ugly queue, writing out contents to file. - */ - public function unlock() - { - if ($this->haveLock === true) + if (file_exists($directoryPath)) { - unlink($this->queueGroupDirPath.'queue.lock'); - $this->haveLock = false; + if (!is_dir($directoryPath)) + throw new \RuntimeException('Argument 1 expected to be path to directory, path to non-directory seen'); } - } - - /** - * @throws \RuntimeException - * @return bool - */ - public function isLocked() - { - if ($this->init === false) - throw new \RuntimeException('UglyQueue::isLocked - Must first initialize queue'); - - // First check for lock file - if (is_file($this->queueGroupDirPath.'queue.lock')) + else if (!@mkdir($directoryPath)) { - $lock = json_decode(file_get_contents($this->queueGroupDirPath.'queue.lock'), true); - - // If we have an invalid lock structure, THIS IS BAD. - if (!isset($lock['ttl']) || !isset($lock['born'])) - throw new \RuntimeException('UglyQueue::isLocked - Invalid "queue.lock" file structure seen at "'.$this->queueGroupDirPath.'queue.lock'.'"'); - - // Otherwise... - $lock_ttl = ((int)$lock['born'] + (int)$lock['ttl']); - - // If we're within the TTL of the lock, assume another thread is already processing. - // We'll pick it up on the next go around. - if ($lock_ttl > time()) - return true; - - // Else, remove lock file and assume we're good to go! - unlink($this->queueGroupDirPath.'queue.lock'); - return false; + throw new \RuntimeException('Unable to create queue directory at path: "'.$directoryPath.'".'); } - // If no file, assume not locked. - return false; - } + $uglyQueue = new UglyQueue(); + $uglyQueue->observers = $observers; - /** - * @param string $queueGroup - */ - public function initialize($queueGroup) - { - $this->queueGroup = $queueGroup; - $this->queueGroupDirPath = $this->queueBaseDir.$queueGroup.DIRECTORY_SEPARATOR; + $split = preg_split('#[/\\]+/#', $directoryPath); - // Create directory for this queue group - if (!is_dir($this->queueGroupDirPath)) - mkdir($this->queueGroupDirPath); + $uglyQueue->_name = end($split); + $uglyQueue->_path = rtrim(realpath(implode(DIRECTORY_SEPARATOR, $split)), "/\\").DIRECTORY_SEPARATOR; + + if (is_writable($uglyQueue->_path)) + $uglyQueue->mode = self::QUEUE_READWRITE; + else if (is_readable($uglyQueue->_path)) + $uglyQueue->mode = self::QUEUE_READONLY; // Insert "don't look here" index.html file - if (!file_exists($this->queueGroupDirPath.'index.html')) + if (!file_exists($uglyQueue->_path.'index.html')) { + if ($uglyQueue->mode === self::QUEUE_READONLY) + throw new \RuntimeException('Cannot initialize queue with name "'.$uglyQueue->_name.'", the user lacks permission to create files.'); + $html = << @@ -168,13 +98,153 @@ class UglyQueue HTML; - file_put_contents($this->queueGroupDirPath.'index.html', $html); + file_put_contents($uglyQueue->_path.'index.html', $html); } - if (!file_exists($this->queueGroupDirPath.'queue.txt')) - file_put_contents($this->queueGroupDirPath.'queue.txt', ''); + if (!file_exists($uglyQueue->_path.'queue.txt')) + { + if ($uglyQueue->mode === self::QUEUE_READONLY) + throw new \RuntimeException('Cannot initialize queue with name "'.$uglyQueue->_name.'", the user lacks permission to create files.'); - $this->init = true; + file_put_contents($uglyQueue->_path.'queue.txt', ''); + } + + $uglyQueue->notifyStatus = self::NOTIFY_QUEUE_INITIALIZED; + $uglyQueue->notify(); + + return $uglyQueue; + } + + /** + * @param string $param + * @return string + * @throws \OutOfBoundsException + */ + public function __get($param) + { + switch($param) + { + case 'name' : + return $this->_name; + + case 'path': + return $this->_path; + + case 'locked': + return $this->_locked; + + default: + throw new \OutOfBoundsException(get_class($this).' does not have a property named "'.$param.'".'); + } + } + + /** + * Destructor + */ + public function __destruct() + { + $this->_populateQueue(); + $this->unlock(); + file_put_contents($this->_path.UglyQueueManager::UGLY_QUEUE_SERIALIZED_NAME, serialize($this)); + } + + /** + * @param int $ttl Time to live in seconds + * @throws \InvalidArgumentException + * @return bool + */ + public function lock($ttl = 250) + { + if (!is_int($ttl)) + throw new \InvalidArgumentException('Argument 1 expected to be integer, "'.gettype($ttl).'" seen'); + + if ($ttl < 0) + throw new \InvalidArgumentException('Argument 1 expected to be positive integer, "'.$ttl.'" seen'); + + $alreadyLocked = $this->isLocked(); + + // If there is currently no lock + if ($alreadyLocked === false) + return $this->createLockFile($ttl); + + // If we make it this far, there is already a lock in place. + $this->_locked = false; + $this->notifyStatus = self::NOTIFY_QUEUE_LOCKED_BY_OTHER_PROCESS; + $this->notify(); + + return false; + } + + /** + * @param int $ttl seconds to live + * @return bool + */ + protected function createLockFile($ttl) + { + $ok = (bool)@file_put_contents( + $this->_path.'queue.lock', + json_encode(array('ttl' => $ttl, 'born' => time()))); + + if ($ok !== true) + { + $this->notifyStatus = self::NOTIFY_QUEUE_FAILED_TO_LOCK; + $this->notify(); + return $this->_locked = false; + } + + $this->_locked = true; + + $this->notifyStatus = self::NOTIFY_QUEUE_LOCKED; + $this->notify(); + + return true; + } + + /** + * Close this ugly queue, writing out contents to file. + */ + public function unlock() + { + if ($this->_locked === true) + { + unlink($this->_path.'queue.lock'); + $this->_locked = false; + + $this->notifyStatus = self::NOTIFY_QUEUE_UNLOCKED; + $this->notify(); + } + } + + /** + * @throws \RuntimeException + * @return bool + */ + public function isLocked() + { + // First check for lock file + if (is_file($this->_path.'queue.lock')) + { + $lock = json_decode(file_get_contents($this->_path.'queue.lock'), true); + + // If we have an invalid lock structure, THIS IS BAD. + if (!isset($lock['ttl']) || !isset($lock['born'])) + throw new \RuntimeException(get_class($this).'::isLocked - Invalid "queue.lock" file structure seen at "'.$this->_path.'queue.lock'.'"'); + + // Otherwise... + $lock_ttl = ((int)$lock['born'] + (int)$lock['ttl']); + + // If we're within the TTL of the lock, assume another thread is already processing. + // We'll pick it up on the next go around. + if ($lock_ttl > time()) + return true; + + // Else, remove lock file and assume we're good to go! + unlink($this->_path.'queue.lock'); + return false; + } + + // If no file, assume not locked. + return false; } /** @@ -185,30 +255,37 @@ HTML; */ public function processQueue($count = 1) { - if ($this->init === false) - throw new \RuntimeException('UglyQueue::processQueue - Must first initialize queue!'); + if ($this->mode === self::QUEUE_READONLY) + throw new \RuntimeException('Queue "'.$this->_name.'" cannot be processed.'. + ' It was started in Read-Only mode (the user running this process does not have permission to write to the queue directory).'); // If we don't have a lock, assume issue and move on. - if ($this->haveLock === false) - throw new \RuntimeException('UglyQueue::processQueue - Cannot process queue locked by another process'); + if ($this->_locked === false) + throw new \RuntimeException('Cannot process queue named "'.$this->_name.'". It is locked by another process.'); // If non-int valid is passed if (!is_int($count)) - throw new \InvalidArgumentException('UglyQueue::processQueue - Argument 1 expected to be integer greater than 0, "'.gettype($count).'" seen'); + throw new \InvalidArgumentException('Argument 1 expected to be integer greater than 0, "'.gettype($count).'" seen'); // If negative integer passed if ($count <= 0) - throw new \InvalidArgumentException('UglyQueue::processQueue - Argument 1 expected to be integer greater than 0, "'.$count.'" seen'); + throw new \InvalidArgumentException('Argument 1 expected to be integer greater than 0, "'.$count.'" seen'); + + if ($this->notifyStatus !== self::NOTIFY_QUEUE_PROCESSING) + { + $this->notifyStatus = self::NOTIFY_QUEUE_PROCESSING; + $this->notify(); + } // Find number of lines in the queue file - $lineCount = FileHelper::getLineCount($this->queueGroupDirPath.'queue.txt'); + $lineCount = FileHelper::getLineCount($this->_path.'queue.txt'); // If queue line count is 0, assume empty if ($lineCount === 0) return false; // Try to open the file for reading / writing. - $queueFileHandle = fopen($this->queueGroupDirPath.'queue.txt', 'r+'); + $queueFileHandle = fopen($this->_path.'queue.txt', 'r+'); if ($queueFileHandle === false) $this->unlock(); @@ -231,11 +308,14 @@ HTML; rewind($queueFileHandle); ftruncate($queueFileHandle, 0); fclose($queueFileHandle); + + $this->notifyStatus = self::NOTIFY_QUEUE_REACHED_END; + $this->notify(); } // Otherwise, create new queue file minus the processed lines. else { - $tmp = fopen($this->queueGroupDirPath.'queue.tmp', 'w+'); + $tmp = fopen($this->_path.'queue.tmp', 'w+'); rewind($queueFileHandle); $i = 0; while (($line = fgets($queueFileHandle)) !== false && $i < $start_line) @@ -248,8 +328,8 @@ HTML; fclose($queueFileHandle); fclose($tmp); - unlink($this->queueGroupDirPath.'queue.txt'); - rename($this->queueGroupDirPath.'queue.tmp', $this->queueGroupDirPath.'queue.txt'); + unlink($this->_path.'queue.txt'); + rename($this->_path.'queue.tmp', $this->_path.'queue.txt'); } return $data; @@ -263,18 +343,18 @@ HTML; */ public function addToQueue($key, $value) { - if ($this->init === false) - throw new \RuntimeException('UglyQueue::addToQueue - Must first initialize queue!'); + if ($this->mode === self::QUEUE_READONLY) + throw new \RuntimeException('Cannot add items to queue "'.$this->_name.'" as it is in read-only mode'); // If we don't have a lock, assume issue and move on. - if ($this->haveLock === false) - throw new \RuntimeException('UglyQueue::addToQueue - You do not have a lock on this queue'); + if ($this->_locked === false) + throw new \RuntimeException('Cannot add items to queue "'.$this->_name.'". Queue is already locked by another process'); if (!is_resource($this->_tmpHandle)) { - $this->_tmpHandle = fopen($this->queueGroupDirPath.'queue.tmp', 'w+'); + $this->_tmpHandle = fopen($this->_path.'queue.tmp', 'w+'); if ($this->_tmpHandle === false) - throw new \RuntimeException('UglyQueue::addToQueue - Unable to create "queue.tmp" file'); + throw new \RuntimeException('Unable to create "queue.tmp" file.'); } if (is_array($value) || $value instanceof \stdClass) @@ -295,9 +375,9 @@ HTML; { if (is_resource($this->_tmpHandle)) { - if (file_exists($this->queueGroupDirPath.'queue.txt')) + if (file_exists($this->_path.'queue.txt')) { - $queueFileHandle = fopen($this->queueGroupDirPath.'queue.txt', 'r+'); + $queueFileHandle = fopen($this->_path.'queue.txt', 'r+'); while (($line = fgets($queueFileHandle)) !== false) { if ($line !== "\n" && $line !== "") @@ -305,11 +385,11 @@ HTML; } fclose($queueFileHandle); - unlink($this->queueGroupDirPath.'queue.txt'); + unlink($this->_path.'queue.txt'); } fclose($this->_tmpHandle); - rename($this->queueGroupDirPath.'queue.tmp', $this->queueGroupDirPath.'queue.txt'); + rename($this->_path.'queue.tmp', $this->_path.'queue.txt'); } } @@ -319,10 +399,7 @@ HTML; */ public function getQueueItemCount() { - if ($this->init === false) - throw new \RuntimeException('UglyQueue::getQueueItemCount - Must first initialize queue'); - - return FileHelper::getLineCount($this->queueGroupDirPath.'queue.txt'); + return FileHelper::getLineCount($this->_path.'queue.txt'); } /** @@ -332,19 +409,16 @@ HTML; */ public function keyExistsInQueue($key) { - if ($this->init === false) - throw new \RuntimeException('UglyQueue::keyExistsInQueue - Must first initialize queue'); - $key = (string)$key; // Try to open the file for reading / writing. - $queueFileHandle = fopen($this->queueGroupDirPath.'queue.txt', 'r'); + $queueFileHandle = fopen($this->_path.'queue.txt', 'r'); while(($line = fscanf($queueFileHandle, "%s\t")) !== false) { - list($queueKey) = $line; + list ($lineKey, $lineValue) = $line; - if ($key === $queueKey) + if ($key === $lineKey) { fclose($queueFileHandle); return true; @@ -356,59 +430,74 @@ HTML; } /** - * @param string $groupName - * @return bool + * (PHP 5 >= 5.1.0) + * String representation of object + * @link http://php.net/manual/en/serializable.serialize.php + * + * @return string the string representation of the object or null */ - public function queueExists($groupName) + public function serialize() { - return (bool)is_dir($this->queueBaseDir.$groupName); + return serialize(array($this->_name, $this->_path)); } /** - * @return array + * (PHP 5 >= 5.1.0) + * Constructs the object + * @link http://php.net/manual/en/serializable.unserialize.php + * + * @param string $serialized The string representation of the object. + * @return void */ - public function getInitializedQueueList() + public function unserialize($serialized) { - $queueList = array(); - foreach(glob(realpath($this->queueBaseDir).DIRECTORY_SEPARATOR.'*', GLOB_ONLYDIR) as $queueDir) + /** @var \DCarbone\UglyQueue $uglyQueue */ + $data = unserialize($serialized); + $this->_name = $data[0]; + $this->_path = $data[1]; + } + + /** + * (PHP 5 >= 5.1.0) + * Attach an SplObserver + * @link http://php.net/manual/en/splsubject.attach.php + * + * @param \SplObserver $observer The SplObserver to attach. + * @return void + */ + public function attach(\SplObserver $observer) + { + if (!in_array($observer, $this->observers)) + $this->observers[] = $observer; + } + + /** + * (PHP 5 >= 5.1.0) + * Detach an observer + * @link http://php.net/manual/en/splsubject.detach.php + * + * @param \SplObserver $observer The SplObserver to detach. + * @return void + */ + public function detach(\SplObserver $observer) + { + $idx = array_search($observer, $this->observers, true); + if ($idx !== false) + unset($this->observers[$idx]); + } + + /** + * (PHP 5 >= 5.1.0) + * Notify an observer + * @link http://php.net/manual/en/splsubject.notify.php + * + * @return void + */ + public function notify() + { + for ($i = 0, $count = count($this->observers); $i < $count; $i++) { - $exp = explode(DIRECTORY_SEPARATOR, $queueDir); - $dir = end($exp); - if (strpos($dir, '.') !== 0) - $queueList[] = $dir; + $this->observers[$i]->notify($this); } - return $queueList; - } - - /** - * @return boolean - */ - public function getInit() - { - return $this->init; - } - - /** - * @return string - */ - public function getQueueBaseDir() - { - return $this->queueBaseDir; - } - - /** - * @return string - */ - public function getQueueGroupDirPath() - { - return $this->queueGroupDirPath; - } - - /** - * @return string - */ - public function getQueueGroup() - { - return $this->queueGroup; } } \ No newline at end of file diff --git a/src/UglyQueueManager.php b/src/UglyQueueManager.php new file mode 100644 index 0000000..83fa7c8 --- /dev/null +++ b/src/UglyQueueManager.php @@ -0,0 +1,234 @@ +notifyStatus = self::NOTIFY_MANAGER_INITIALIZED; + + $this->config = $config; + $this->queueBaseDir = rtrim(realpath($this->config['queue-base-dir']), "/\\").DIRECTORY_SEPARATOR; + $this->observers = $observers; + } + + /** + * @param array $config + * @param array $observers + * @return UglyQueueManager + */ + public static function init(array $config, array $observers = array()) + { + /** @var \DCarbone\UglyQueueManager $manager */ + $manager = new static($config, array()); + + $manager->observers = $observers; + + $manager->notify(); + + /** @var \DCarbone\UglyQueue $uglyQueue */ + + foreach(glob($manager->queueBaseDir.DIRECTORY_SEPARATOR.'*', GLOB_ONLYDIR) as $queueDir) + { + // Try to avoid looking at hidden directories or magic dirs such as '.' and '..' + $split = preg_split('#[/\\]+#', $queueDir); + if (strpos(end($split), '.') === 0) + continue; + + if (file_exists($queueDir.DIRECTORY_SEPARATOR.self::UGLY_QUEUE_SERIALIZED_NAME)) + $uglyQueue = unserialize(file_get_contents($queueDir.DIRECTORY_SEPARATOR.self::UGLY_QUEUE_SERIALIZED_NAME)); + else + $uglyQueue = UglyQueue::queueWithGroupDirectoryPathAndObservers($queueDir, $manager->observers); + + $manager->addQueue($uglyQueue); + } + + return $manager; + } + + /** + * @param UglyQueue $uglyQueue + * @return \DCarbone\UglyQueueManager + * @throws \InvalidArgumentException + */ + public function addQueue(UglyQueue $uglyQueue) + { + if ($this->queueExistsByName($uglyQueue->name)) + throw new \InvalidArgumentException('Queue named "'.$uglyQueue->name.'" already exists in this manager.'); + + + $this->queues[$uglyQueue->name] = $uglyQueue; + + $this->notifyStatus = self::NOTIFY_QUEUE_ADDED; + $this->notify(); + + return $this; + } + + /** + * @param $path + * @return \DCarbone\UglyQueueManager + */ + public function addQueueAtPath($path) + { + $uglyQueue = UglyQueue::queueWithGroupDirectoryPathAndObservers($path, $this->observers); + + return $this->addQueue($uglyQueue); + } + + /** + * @param UglyQueue $uglyQueue + * @return \DCarbone\UglyQueueManager + */ + public function removeQueue(UglyQueue $uglyQueue) + { + if ($this->queueExistsByName($uglyQueue->name)) + unset($this->queues[$uglyQueue->name]); + + return $this; + } + + /** + * @param string $name + * @return \DCarbone\UglyQueueManager + */ + public function removeQueueByName($name) + { + if ($this->queueExistsByName($name)) + { + unset($this->queues[$name]); + $this->notifyStatus = self::NOTIFY_QUEUE_REMOVED; + $this->notify(); + } + + return $this; + } + + /** + * @param string $name + * @return \DCarbone\UglyQueue + * @throws \InvalidArgumentException + */ + public function &getQueueWithName($name) + { + if (isset($this->queues[$name])) + return $this->queues[$name]; + + throw new \InvalidArgumentException('Argument 1 expected to be valid queue name.'); + } + + /** + * @param string $name + * @return bool + */ + public function queueExistsByName($name) + { + return isset($this->queues[$name]); + } + + /** + * @return array + */ + public function getQueueList() + { + return array_keys($this->queues); + } + + /** + * (PHP 5 >= 5.1.0) + * Receive update from subject + * @link http://php.net/manual/en/splobserver.update.php + * + * @param \SplSubject $subject The SplSubject notifying the observer of an update. + * @return void + */ + public function update(\SplSubject $subject) + { + for ($i = 0, $count = count($this->observers); $i < $count; $i++) + { + $this->observers[$i]->notify($subject); + } + } + + /** + * (PHP 5 >= 5.1.0) + * Attach an SplObserver + * @link http://php.net/manual/en/splsubject.attach.php + * + * @param \SplObserver $observer The SplObserver to attach. + * @return void + */ + public function attach(\SplObserver $observer) + { + if (!in_array($observer, $this->observers, true)) + $this->observers[] = $observer; + } + + /** + * (PHP 5 >= 5.1.0) + * Detach an observer + * @link http://php.net/manual/en/splsubject.detach.php + * + * @param \SplObserver $observer The SplObserver to detach. + * @return void + */ + public function detach(\SplObserver $observer) + { + $idx = array_search($observer, $this->observers, true); + if ($idx !== false) + unset($this->observers[$idx]); + } + + /** + * (PHP 5 >= 5.1.0) + * Notify an observer + * @link http://php.net/manual/en/splsubject.notify.php + * + * @return void + */ + public function notify() + { + for ($i = 0, $count = count($this->observers); $i < $count; $i++) + { + $this->observers[$i]->notify($this); + } + } +} \ No newline at end of file