/Predis/AbortedMultiExec.phptMt6Predis/Client.phphMh0vܶPredis/ClientException.phpfMfL Predis/ClientOptions.phpnMn=EPredis/Commands/Append.phpM^+ Predis/Commands/Auth.phpMլ3Predis/Commands/BackgroundRewriteAppendOnlyFile.php_M_Kl"Predis/Commands/BackgroundSave.phplMl5[sPredis/Commands/Command.php M Predis/Commands/Config.phpM|} Predis/Commands/DatabaseSize.phpM9퍶Predis/Commands/DebugObject.phpM:Predis/Commands/Decrement.phpMPredis/Commands/DecrementBy.phpMwPredis/Commands/Delete.phpMnPredis/Commands/Discard.phpM2"$Predis/Commands/DoEcho.phpM|Predis/Commands/Exec.phpMDPredis/Commands/Exists.phpMBDPredis/Commands/Expire.phpM% :>Predis/Commands/ExpireAt.phpM% Predis/Commands/FlushAll.phpM3E!Predis/Commands/FlushDatabase.phpMUPredis/Commands/Get.php{M{Predis/Commands/GetBit.phpM&;Predis/Commands/GetMultiple.phpM0c ٶPredis/Commands/GetRange.phpMHSgPredis/Commands/GetSet.phpMZPredis/Commands/HashDelete.phpM{"Predis/Commands/HashDeleteV24x.phpM\Predis/Commands/HashExists.phpMqŶPredis/Commands/HashGet.phpMRPredis/Commands/HashGetAll.phpMqF#Predis/Commands/HashGetMultiple.phpMpu@#Predis/Commands/HashIncrementBy.phpMfPredis/Commands/HashKeys.phpM&mPredis/Commands/HashLength.phpM]Predis/Commands/HashSet.phpMk#Predis/Commands/HashSetMultiple.php*M*N-#Predis/Commands/HashSetPreserve.phpM/Predis/Commands/HashValues.phpMPredis/Commands/ICommand.phpBMBPredis/Commands/Increment.phpM^ǶPredis/Commands/IncrementBy.phpMZӶPredis/Commands/Info.php3M3Predis/Commands/InfoV24x.phpMvPredis/Commands/Keys.phpM7gPredis/Commands/KeysV12x.phpM<(Predis/Commands/LastSave.phpM?d0Predis/Commands/ListIndex.phpMPredis/Commands/ListInsert.phpMniYPredis/Commands/ListLength.phpMb Predis/Commands/ListPopFirst.phpMe. (Predis/Commands/ListPopFirstBlocking.phpAMA0#Predis/Commands/ListPopLast.phpM'Predis/Commands/ListPopLastBlocking.phpM \'Predis/Commands/ListPopLastPushHead.phpM >/Predis/Commands/ListPopLastPushHeadBlocking.phpKMK # Predis/Commands/ListPushHead.phpM}$Predis/Commands/ListPushHeadV24x.phpM;!Predis/Commands/ListPushHeadX.phpMh Predis/Commands/ListPushTail.phpMr$Predis/Commands/ListPushTailV24x.phpuMu޶!Predis/Commands/ListPushTailX.phpMN0Predis/Commands/ListRange.phpM s rPredis/Commands/ListRemove.phpMĖPredis/Commands/ListSet.phpMjwPredis/Commands/ListTrim.phpM<Predis/Commands/MoveKey.phpMGL϶Predis/Commands/Multi.phpM 2Predis/Commands/Persist.phpM_^Predis/Commands/Ping.php#M#8:6Predis/Commands/Preprocessors/ICommandPreprocessor.phpMs";Predis/Commands/Preprocessors/ICommandPreprocessorChain.php@M@xB7Predis/Commands/Preprocessors/IPreprocessingSupport.phpM7Predis/Commands/Preprocessors/KeyPrefixPreprocessor.php1M1BU3Predis/Commands/Preprocessors/PreprocessorChain.php^M^9| Predis/Commands/Publish.phpMhGFPredis/Commands/Quit.phpMYPredis/Commands/RandomKey.php)M)#?նPredis/Commands/Rename.phpM%-"Predis/Commands/RenamePreserve.phpM(Predis/Commands/Save.phpMqu"Predis/Commands/SelectDatabase.phpMePredis/Commands/Set.php{M{-Predis/Commands/SetAdd.phpM?'Predis/Commands/SetAddV24x.phpMoPredis/Commands/SetBit.phpM橤"Predis/Commands/SetCardinality.phpMB!Predis/Commands/SetDifference.phpM̧z&Predis/Commands/SetDifferenceStore.phpMa5Predis/Commands/SetExpire.phpMͶ#Predis/Commands/SetIntersection.phpMX̶(Predis/Commands/SetIntersectionStore.phpMnIٶPredis/Commands/SetIsMember.phpM 6Predis/Commands/SetMembers.phpM<Predis/Commands/SetMove.phpMkQPredis/Commands/SetMultiple.phpMqn"'Predis/Commands/SetMultiplePreserve.phpMBLPredis/Commands/SetPop.phpM8"9Predis/Commands/SetPreserve.phpM`^Q#Predis/Commands/SetRandomMember.phpM*jPredis/Commands/SetRange.phpMPredis/Commands/SetRemove.phpM Predis/Commands/SetUnion.phpMO!Predis/Commands/SetUnionStore.phpM2϶Predis/Commands/Shutdown.phpM6Predis/Commands/SlaveOf.phpMՎPredis/Commands/Sort.phpMIPredis/Commands/Strlen.phpM7yPredis/Commands/Subscribe.phpYMY\}&Predis/Commands/SubscribeByPattern.phpcMc?Predis/Commands/Substr.phpM`Predis/Commands/TimeToLive.phpM3!Predis/Commands/Type.php}M}QPredis/Commands/Unsubscribe.phpM!s (Predis/Commands/UnsubscribeByPattern.phpM=ˠPredis/Commands/Unwatch.phpMOѶPredis/Commands/Watch.phpMPredis/Commands/ZSetAdd.phpMH#Predis/Commands/ZSetCardinality.phpM?Predis/Commands/ZSetCount.phpM՟#Predis/Commands/ZSetIncrementBy.phpM 4޶)Predis/Commands/ZSetIntersectionStore.phpM.cPredis/Commands/ZSetRange.php1M1܋X$Predis/Commands/ZSetRangeByScore.phpMzPredis/Commands/ZSetRank.phpM;Predis/Commands/ZSetRemove.phpMτ&)Predis/Commands/ZSetRemoveRangeByRank.phpM`*Predis/Commands/ZSetRemoveRangeByScore.phpM$Predis/Commands/ZSetReverseRange.phpMqv+Predis/Commands/ZSetReverseRangeByScore.phpMV##Predis/Commands/ZSetReverseRank.phpM=HPredis/Commands/ZSetScore.phpMlmnW"Predis/Commands/ZSetUnionStore.phpM&!Predis/CommunicationException.php7M7m߶Predis/ConnectionException.phpmMmV.>Predis/ConnectionFactory.phpMPPredis/ConnectionParameters.phpMa=*Predis/Distribution/EmptyRingException.phpWMWݶ Predis/Distribution/HashRing.phpNMN -Predis/Distribution/IDistributionStrategy.phpMJ!)Predis/Distribution/INodeKeyGenerator.phppMpqC&Predis/Distribution/KetamaPureRing.php]M]FPredis/Helpers.phpMWƶPredis/IConnectionFactory.phpdMdU̶ Predis/IConnectionParameters.phpMiFa&Predis/Iterators/MultiBulkResponse.phpMLX75,Predis/Iterators/MultiBulkResponseSimple.phpM+Predis/Iterators/MultiBulkResponseTuple.phpMZPredis/MultiExecContext.phpfMfDG-Predis/Network/ComposableStreamConnection.phph Mh )?A!Predis/Network/ConnectionBase.phpBMBx%$Predis/Network/ConnectionCluster.phpM$gPredis/Network/IConnection.php`M`dt%Predis/Network/IConnectionCluster.phpM׹(Predis/Network/IConnectionComposable.php]M]$Predis/Network/IConnectionSingle.php3M3Z$&Predis/Network/PhpiredisConnection.phpn#Mn#H֤a#Predis/Network/StreamConnection.php*M*]9$Predis/Options/ClientClusterType.phpbMbU=|*Predis/Options/ClientConnectionFactory.phpM6Q^(Predis/Options/ClientKeyDistribution.phpM1! Predis/Options/ClientProfile.php.M.Predis/Options/CustomOption.phpMPredis/Options/IOption.phpMR+Predis/Options/Option.phpnMnzŔ%Predis/Pipeline/IPipelineExecutor.phpM @'Predis/Pipeline/SafeClusterExecutor.phpoMo5 Predis/Pipeline/SafeExecutor.phpWMWж$Predis/Pipeline/StandardExecutor.phpM7%Predis/PipelineContext.phpMMMSAPredis/PredisException.phpsMs"Predis/Profiles/IServerProfile.php M qG!Predis/Profiles/ServerProfile.phpMݺ#Predis/Profiles/ServerVersion12.phpMeS#Predis/Profiles/ServerVersion20.php"M" #Predis/Profiles/ServerVersion22.phpc(Mc(L%Predis/Profiles/ServerVersionNext.php=M=!Predis/ProtocolException.php%M%|_'Predis/Protocols/ICommandSerializer.phpM)1Predis/Protocols/IComposableProtocolProcessor.php+M+P8'Predis/Protocols/IProtocolProcessor.php'M'=0f%Predis/Protocols/IResponseHandler.phpMI$Predis/Protocols/IResponseReader.phpMit0Predis/Protocols/Text/ComposableTextProtocol.phpMw~-Predis/Protocols/Text/ResponseBulkHandler.phpM"K.Predis/Protocols/Text/ResponseErrorHandler.phpdMd6¶4Predis/Protocols/Text/ResponseErrorSilentHandler.phpgMg,fڶ0Predis/Protocols/Text/ResponseIntegerHandler.phpIMI}2Predis/Protocols/Text/ResponseMultiBulkHandler.phpM *8Predis/Protocols/Text/ResponseMultiBulkStreamHandler.phpMӤr/Predis/Protocols/Text/ResponseStatusHandler.phpMB`/Predis/Protocols/Text/TextCommandSerializer.phpM}3&Predis/Protocols/Text/TextProtocol.php M 0G,Predis/Protocols/Text/TextResponseReader.phpFMFjMPredis/PubSubContext.phpMXPredis/ResponseError.php)M)u˗Predis/ResponseQueued.php@M@-Predis/ServerException.phpM_[filterOptions($options); $this->_options = $options; $this->_profile = $options->profile; $this->_connectionFactory = $options->connections; $this->_connection = $this->initializeConnection($parameters); } private function filterOptions($options) { if ($options === null) { return new ClientOptions(); } if ($options instanceof ClientOptions) { return $options; } if (is_array($options)) { return new ClientOptions($options); } if ($options instanceof IServerProfile) { return new ClientOptions(array('profile' => $options)); } if (is_string($options)) { return new ClientOptions(array('profile' => ServerProfile::get($options))); } throw new \InvalidArgumentException("Invalid type for client options"); } private function initializeConnection($parameters) { if ($parameters === null) { return $this->createConnection(new ConnectionParameters()); } if (is_array($parameters)) { if (isset($parameters[0])) { $clusterClass = $this->_options->cluster; $cluster = new $clusterClass($this->_options->key_distribution); foreach ($parameters as $single) { $cluster->add($single instanceof IConnectionSingle ? $single : $this->createConnection($single) ); } return $cluster; } return $this->createConnection($parameters); } if ($parameters instanceof IConnection) { return $parameters; } return $this->createConnection($parameters); } private function createConnection($parameters) { $connection = $this->_connectionFactory->create($parameters); $this->pushInitCommands($connection); return $connection; } private function pushInitCommands(IConnectionSingle $connection) { $params = $connection->getParameters(); if (isset($params->password)) { $connection->pushInitCommand($this->createCommand( 'auth', array($params->password) )); } if (isset($params->database)) { $connection->pushInitCommand($this->createCommand( 'select', array($params->database) )); } } public function getProfile() { return $this->_profile; } public function getOptions() { return $this->_options; } public function getConnectionFactory() { return $this->_connectionFactory; } public function getClientFor($connectionAlias) { if (($connection = $this->getConnection($connectionAlias)) === null) { throw new \InvalidArgumentException( "Invalid connection alias: '$connectionAlias'" ); } return new Client($connection, $this->_options); } public function connect() { $this->_connection->connect(); } public function disconnect() { $this->_connection->disconnect(); } public function quit() { $this->disconnect(); } public function isConnected() { return $this->_connection->isConnected(); } public function getConnection($id = null) { if (isset($id)) { if (!Helpers::isCluster($this->_connection)) { throw new ClientException( 'Retrieving connections by alias is supported '. 'only with clustered connections' ); } return $this->_connection->getConnectionById($id); } return $this->_connection; } public function __call($method, $arguments) { $command = $this->_profile->createCommand($method, $arguments); return $this->_connection->executeCommand($command); } public function createCommand($method, $arguments = array()) { return $this->_profile->createCommand($method, $arguments); } public function executeCommand(ICommand $command) { return $this->_connection->executeCommand($command); } public function executeCommandOnShards(ICommand $command) { if (Helpers::isCluster($this->_connection)) { $replies = array(); foreach ($this->_connection as $connection) { $replies[] = $connection->executeCommand($command); } return $replies; } return array($this->_connection->executeCommand($command)); } private function sharedInitializer($argv, $initializer) { switch (count($argv)) { case 0: return $this->$initializer(); case 1: list($arg0) = $argv; return is_array($arg0) ? $this->$initializer($arg0) : $this->$initializer(null, $arg0); case 2: list($arg0, $arg1) = $argv; return $this->$initializer($arg0, $arg1); default: return $this->$initializer($this, $argv); } } public function pipeline(/* arguments */) { return $this->sharedInitializer(func_get_args(), 'initPipeline'); } private function initPipeline(Array $options = null, $pipelineBlock = null) { $pipeline = null; if (isset($options)) { if (isset($options['safe']) && $options['safe'] == true) { $connection = $this->_connection; $pipeline = new PipelineContext($this, Helpers::isCluster($connection) ? new Pipeline\SafeClusterExecutor($connection) : new Pipeline\SafeExecutor($connection) ); } else { $pipeline = new PipelineContext($this); } } return $this->pipelineExecute( $pipeline ?: new PipelineContext($this), $pipelineBlock ); } private function pipelineExecute(PipelineContext $pipeline, $block) { return $block !== null ? $pipeline->execute($block) : $pipeline; } public function multiExec(/* arguments */) { return $this->sharedInitializer(func_get_args(), 'initMultiExec'); } private function initMultiExec(Array $options = null, $transBlock = null) { $multi = isset($options) ? new MultiExecContext($this, $options) : new MultiExecContext($this); return $transBlock !== null ? $multi->execute($transBlock) : $multi; } public function pubSubContext(Array $options = null) { return new PubSubContext($this, $options); } } initialize($options); } private static function getSharedOptions() { if (isset(self::$_sharedOptions)) { return self::$_sharedOptions; } self::$_sharedOptions = array( 'profile' => new ClientProfile(), 'key_distribution' => new ClientKeyDistribution(), 'connections' => new ClientConnectionFactory(), 'cluster' => new ClientClusterType(), ); return self::$_sharedOptions; } public static function define($option, IOption $handler) { self::getSharedOptions(); self::$_sharedOptions[$option] = $handler; } public static function undefine($option) { self::getSharedOptions(); unset(self::$_sharedOptions[$option]); } private function initialize($options) { $this->_handlers = self::getSharedOptions(); foreach ($options as $option => $value) { if (isset($this->_handlers[$option])) { $handler = $this->_handlers[$option]; $this->_options[$option] = $handler($value); } } } public function __get($option) { if (isset($this->_options[$option])) { return $this->_options[$option]; } if (isset($this->_handlers[$option])) { $opts = self::getSharedOptions(); $value = $opts[$option]->getDefault(); $this->_options[$option] = $value; return $value; } return null; } } _arguments = $this->filterArguments($arguments); unset($this->_hash); } public function getArguments() { return $this->_arguments; } public function getArgument($index = 0) { if (isset($this->_arguments[$index]) === true) { return $this->_arguments[$index]; } } protected function canBeHashed() { return isset($this->_arguments[0]); } protected function getHashablePart($key) { $start = strpos($key, '{'); if ($start !== false) { $end = strpos($key, '}', $start); if ($end !== false) { $key = substr($key, ++$start, $end - $start); } } return $key; } protected function checkSameHashForKeys(Array $keys) { if (($count = count($keys)) === 0) { return false; } $currentKey = $this->getHashablePart($keys[0]); for ($i = 1; $i < $count; $i++) { $nextKey = $this->getHashablePart($keys[$i]); if ($currentKey !== $nextKey) { return false; } $currentKey = $nextKey; } return true; } public function getHash(INodeKeyGenerator $distributor) { if (isset($this->_hash)) { return $this->_hash; } if ($this->canBeHashed()) { $key = $this->getHashablePart($this->_arguments[0]); $this->_hash = $distributor->generateKey($key); return $this->_hash; } return null; } public function parseResponse($data) { return $data; } public function __toString() { $reducer = function($acc, $arg) { if (strlen($arg) > 32) { $arg = substr($arg, 0, 32) . '[...]'; } $acc .= " $arg"; return $acc; }; return array_reduce($this->getArguments(), $reducer, $this->getId()); } } getArguments(); if (count($args) === 1) { return true; } return $this->checkSameHashForKeys($args); } } checkSameHashForKeys($this->getArguments()); } } $v) { $flattenedKVs[] = $k; $flattenedKVs[] = $v; } return $flattenedKVs; } return $arguments; } } parseAllocationStats($v); continue; } $info[$k] = $v; } else { $info[$k] = $this->parseDatabaseStats($v); } } return $info; } protected function parseDatabaseStats($str) { $db = array(); foreach (explode(',', $str) as $dbvar) { list($dbvk, $dbvv) = explode('=', $dbvar); $db[trim($dbvk)] = $dbvv; } return $db; } protected function parseAllocationStats($str) { $stats = array(); foreach (explode(',', $str) as $kv) { @list($size, $objects, $extra) = explode('=', $kv); // hack to prevent incorrect values when parsing the >=256 key if (isset($extra)) { $size = ">=$objects"; $objects = $extra; } $stats[$size] = $objects; } return $stats; } } parseAllocationStats($v); continue; } $current[$k] = $v; } else { $current[$k] = $this->parseDatabaseStats($v); } } return $info; } } checkSameHashForKeys( array_slice(($args = $this->getArguments()), 0, count($args) - 1) ); } } checkSameHashForKeys($this->getArguments()); } } checkSameHashForKeys( array_slice($args = $this->getArguments(), 0, count($args) - 1) ); } } checkProfile($profile); } $this->_prefix = $prefix; $this->_strategies = $this->getPrefixStrategies(); } protected function checkProfile(IServerProfile $profile) { if (!in_array($profile, $this->getSupportedProfiles())) { throw new ClientException("Unsupported profile: $profile"); } } protected function getSupportedProfiles() { return array('1.2', '2.0', '2.2'); } protected function getPrefixStrategies() { $singleKey = function(&$arguments, $prefix) { $arguments[0] = "$prefix{$arguments[0]}"; }; $multiKeys = function(&$arguments, $prefix) { if (count($arguments) === 1 && is_array($arguments[0])) { $arguments = &$arguments[0]; } foreach ($arguments as &$key) { $key = "$prefix$key"; } }; $skipLast = function(&$arguments, $prefix) { $length = count($arguments); for ($i = 0; $i < $length - 1; $i++) { $arguments[$i] = "$prefix{$arguments[$i]}"; } }; $interleavedKeys = function(&$arguments, $prefix) { $length = count($arguments); if ($length === 1 && is_array($arguments[0])) { $oldKvs = &$arguments[0]; $newKvs = array(); foreach ($oldKvs as $key => $value) { $newKvs["$prefix$key"] = $value; unset($oldKvs[$key]); } $arguments[0] = $newKvs; } else { for ($i = 0; $i < $length; $i += 2) { $arguments[$i] = "$prefix{$arguments[$i]}"; } } }; $zunionstore = function(&$arguments, $prefix) { $arguments[0] = "$prefix{$arguments[0]}"; if (is_array($arguments[1])) { foreach ($arguments[1] as &$destinationKey) { $destinationKey = "$prefix$destinationKey"; } $args = &$arguments[1]; $length = count($args); for ($i = 0; $i < $length; $i++) { $arguments[1][$i] = "$prefix{$args[$i]}"; } } else { $length = (int)$arguments[1]; for ($i = 2; $i < $length; $i++) { $arguments[$i] = "$prefix{$arguments[$i]}"; } } }; $sort = function(&$arguments, $prefix) { $arguments[0] = "$prefix{$arguments[0]}"; if (count($arguments) === 1) { return; } foreach ($arguments[1] as $modifier => &$value) { switch (strtoupper($modifier)) { case 'BY': case 'STORE': $value = "$prefix$value"; break; case 'GET': if (is_array($value)) { foreach ($value as &$getItem) { $getItem = "$prefix$getItem"; } } else { $value = "$prefix$value"; } break; } } }; $debug = function(&$arguments, $prefix) { if (count($arguments) === 3 && strtoupper($arguments[1]) == 'OBJECT') { $arguments[2] = "$prefix{$arguments[2]}"; } }; $cmdSingleKey = array( 'type', 'exists', 'move', 'expire', 'persist', 'sort', 'expireat', 'ttl', 'append', 'getrange', 'setnx', 'decr', 'getset', 'setrange', 'decrby', 'incr', 'set', 'strlen', 'get', 'incrby', 'setbit', 'getbit', 'setex', 'hdel', 'hgetall', 'hlen', 'hset', 'hexists', 'hincrby', 'hmget', 'hsetnx', 'hget', 'hkeys', 'hmset', 'hvals', 'lindex', 'linsert', 'llen', 'lpop', 'lpush', 'lpushx', 'rpushx', 'lrange', 'lrem', 'lset', 'ltrim', 'rpop', 'rpush', 'rpushx', 'sadd', 'scard', 'sismember', 'smembers', 'spop', 'srandmember', 'srem', 'zadd', 'zcard', 'zcount', 'zincrby', 'zrange', 'zrangebyscore', 'zrank', 'zrem', 'zremrangebyrank', 'zremrangebyscore', 'zrevrange', 'zrevrangebyscore', 'zrevrank', 'zscore', 'publish', 'keys', ); $cmdMultiKeys = array( 'del', 'rename', 'renamenx', 'mget', 'rpoplpush', 'sdiff', 'sdiffstore', 'sinter', 'sinterstore', 'sunion', 'sunionstore', 'subscribe', 'punsubscribe', 'subscribe', 'unsubscribe', 'watch', ); return array_merge( array_fill_keys($cmdSingleKey, $singleKey), array_fill_keys($cmdMultiKeys, $multiKeys), array( 'blpop' => $skipLast, 'blpop' => $skipLast, 'brpoplpush' => $skipLast, 'smove' => $skipLast, 'mset' => $interleavedKeys, 'msetnx' => $interleavedKeys, 'zinterstore' => $zunionstore, 'zunionstore' => $zunionstore, 'sort' => $sort, 'debug' => $debug ) ); } public function setPrefixStrategy($command, $strategy) { if (!is_callable($callable)) { throw new \InvalidArgumentException( 'The command preprocessor strategy must be a callable object' ); } $this->_strategies[$command] = $strategy; } public function getPrefixStrategy($command) { if (isset($this->_strategies[$command])) { return $this->_strategies[$command]; } } public function setPrefix($prefix) { $this->_prefix = $prefix; } public function getPrefix() { return $this->_prefix; } public function process($method, &$arguments) { if (isset($this->_strategies[$method])) { $this->_strategies[$method]($arguments, $this->_prefix); } } } add($preprocessor); } } public function add(ICommandPreprocessor $preprocessor) { $this->_preprocessors[] = $preprocessor; } public function remove(ICommandPreprocessor $preprocessor) { $index = array_search($preprocessor, $this->_preprocessors, true); if ($index !== false) { unset($this->_preprocessors); } } public function process($method, &$arguments) { $count = count($this->_preprocessors); for ($i = 0; $i < $count; $i++) { $this->_preprocessors[$i]->process($method, $arguments); } } public function getPreprocessors() { return $this->_preprocessors; } public function getIterator() { return new \ArrayIterator($this->_preprocessors); } public function count() { return count($this->_preprocessors); } public function offsetExists($index) { return isset($this->_preprocessors[$index]); } public function offsetGet($index) { return $this->_preprocessors[$index]; } public function offsetSet($index, $preprocessor) { if (!$preprocessor instanceof ICommandPreprocessor) { throw new \InvalidArgumentException( 'A preprocessor chain can hold only instances of classes implementing '. 'the Predis\Commands\Preprocessors\ICommandPreprocessor interface' ); } $this->_preprocessors[$index] = $preprocessor; } public function offsetUnset($index) { unset($this->_preprocessors[$index]); } } checkSameHashForKeys($this->getArguments()); } } checkSameHashForKeys($this->getArguments()); } } $v) { $flattenedKVs[] = $k; $flattenedKVs[] = $v; } return $flattenedKVs; } return $arguments; } protected function canBeHashed() { $args = $this->getArguments(); $keys = array(); for ($i = 0; $i < count($args); $i += 2) { $keys[] = $args[$i]; } return $this->checkSameHashForKeys($keys); } } true); $lastType = 'array'; } if ($lastType === 'array') { $options = $this->prepareOptions(array_pop($arguments)); return array_merge($arguments, $options); } } return $arguments; } protected function prepareOptions($options) { $opts = array_change_key_case($options, CASE_UPPER); $finalizedOpts = array(); if (isset($opts['WITHSCORES'])) { $finalizedOpts[] = 'WITHSCORES'; $this->_withScores = true; } return $finalizedOpts; } public function parseResponse($data) { if ($this->_withScores) { if ($data instanceof \Iterator) { return new MultiBulkResponseTuple($data); } $result = array(); for ($i = 0; $i < count($data); $i++) { $result[] = array($data[$i], $data[++$i]); } return $result; } return $data; } } 2 && is_array($arguments[$argc - 1])) { $options = $this->prepareOptions(array_pop($arguments)); } if (is_array($arguments[1])) { $arguments = array_merge( array($arguments[0], count($arguments[1])), $arguments[1] ); } return array_merge($arguments, $options); } private function prepareOptions($options) { $opts = array_change_key_case($options, CASE_UPPER); $finalizedOpts = array(); if (isset($opts['WEIGHTS']) && is_array($opts['WEIGHTS'])) { $finalizedOpts[] = 'WEIGHTS'; foreach ($opts['WEIGHTS'] as $weight) { $finalizedOpts[] = $weight; } } if (isset($opts['AGGREGATE'])) { $finalizedOpts[] = 'AGGREGATE'; $finalizedOpts[] = $opts['AGGREGATE']; } return $finalizedOpts; } protected function canBeHashed() { $args = $this->getArguments(); return $this->checkSameHashForKeys( array_merge(array($args[0]), array_slice($args, 2, $args[1])) ); } } _connection = $connection; parent::__construct($message, $code, $innerException); } public function getConnection() { return $this->_connection; } public function shouldResetConnection() { return true; } } _instanceSchemes = $schemesMap; } } private static function ensureDefaultSchemes() { if (!isset(self::$_globalSchemes)) { self::$_globalSchemes = array( 'tcp' => '\Predis\Network\StreamConnection', 'unix' => '\Predis\Network\StreamConnection', ); } } private static function checkConnectionClass($class) { $connectionReflection = new \ReflectionClass($class); if (!$connectionReflection->isSubclassOf('\Predis\Network\IConnectionSingle')) { throw new ClientException( "The class '$class' is not a valid connection class" ); } } public static function define($scheme, $connectionClass) { self::ensureDefaultSchemes(); self::checkConnectionClass($connectionClass); self::$_globalSchemes[$scheme] = $connectionClass; } public function create($parameters) { if (!$parameters instanceof IConnectionParameters) { $parameters = new ConnectionParameters($parameters); } $scheme = $parameters->scheme; if (isset($this->_instanceSchemes[$scheme])) { $connection = $this->_instanceSchemes[$scheme]; } else if (isset(self::$_globalSchemes[$scheme])) { $connection = self::$_globalSchemes[$scheme]; } else { throw new ClientException("Unknown connection scheme: $scheme"); } return new $connection($parameters); } } parseURI($parameters); } $this->_userDefined = array_keys($parameters); $this->_parameters = $this->filter($parameters) + self::$_defaultParameters; } private static function ensureDefaults() { if (!isset(self::$_defaultParameters)) { self::$_defaultParameters = array( 'scheme' => 'tcp', 'host' => '127.0.0.1', 'port' => 6379, 'database' => null, 'password' => null, 'connection_async' => false, 'connection_persistent' => false, 'connection_timeout' => 5.0, 'read_write_timeout' => null, 'alias' => null, 'weight' => null, 'path' => null, 'iterable_multibulk' => false, 'throw_errors' => true, ); } if (!isset(self::$_validators)) { $boolValidator = function($value) { return (bool) $value; }; $floatValidator = function($value) { return (float) $value; }; $intValidator = function($value) { return (int) $value; }; self::$_validators = array( 'port' => $intValidator, 'connection_async' => $boolValidator, 'connection_persistent' => $boolValidator, 'connection_timeout' => $floatValidator, 'read_write_timeout' => $floatValidator, 'iterable_multibulk' => $boolValidator, 'throw_errors' => $boolValidator, ); } } public static function define($parameter, $default, $callable = null) { self::ensureDefaults(); self::$_defaultParameters[$parameter] = $default; if ($default instanceof IOption) { self::$_validators[$parameter] = $default; return; } if (!isset($callable)) { unset(self::$_validators[$parameter]); return; } if (!is_callable($callable)) { throw new \InvalidArgumentException( "The validator for $parameter must be a callable object" ); } self::$_validators[$parameter] = $callable; } public static function undefine($parameter) { self::ensureDefaults(); unset(self::$_defaultParameters[$parameter], self::$_validators[$parameter]); } private function parseURI($uri) { if (stripos($uri, 'unix') === 0) { // Hack to support URIs for UNIX sockets with minimal effort. $uri = str_ireplace('unix:///', 'unix://localhost/', $uri); } if (($parsed = @parse_url($uri)) === false || !isset($parsed['host'])) { throw new ClientException("Invalid URI: $uri"); } if (isset($parsed['query'])) { foreach (explode('&', $parsed['query']) as $kv) { @list($k, $v) = explode('=', $kv); $parsed[$k] = $v; } unset($parsed['query']); } return $parsed; } private function filter(Array $parameters) { if (count($parameters) > 0) { $validators = array_intersect_key(self::$_validators, $parameters); foreach ($validators as $parameter => $validator) { $parameters[$parameter] = $validator($parameters[$parameter]); } } return $parameters; } public function __get($parameter) { $value = $this->_parameters[$parameter]; if ($value instanceof IOption) { $this->_parameters[$parameter] = ($value = $value->getDefault()); } return $value; } public function __isset($parameter) { return isset($this->_parameters[$parameter]); } public function setByUser($parameter) { return in_array($parameter, $this->_userDefined); } protected function getBaseURI() { if ($this->scheme === 'unix') { return "{$this->scheme}://{$this->path}"; } return "{$this->scheme}://{$this->host}:{$this->port}"; } protected function getDisallowedURIParts() { return array('scheme', 'host', 'port', 'password', 'path'); } public function toArray() { return $this->_parameters; } public function __toString() { $query = array(); $parameters = $this->toArray(); $reject = $this->getDisallowedURIParts(); foreach ($this->_userDefined as $param) { if (in_array($param, $reject) || !isset($parameters[$param])) { continue; } $value = $parameters[$param]; $query[] = "$param=" . ($value === false ? '0' : $value); } if (count($query) === 0) { return $this->getBaseURI(); } return $this->getBaseURI() . '/?' . implode('&', $query); } public function __sleep() { return array('_parameters', '_userDefined'); } public function __wakeup() { self::ensureDefaults(); } } _replicas = $replicas; $this->_nodes = array(); } public function add($node, $weight = null) { // In case of collisions in the hashes of the nodes, the node added // last wins, thus the order in which nodes are added is significant. $this->_nodes[] = array('object' => $node, 'weight' => (int) $weight ?: $this::DEFAULT_WEIGHT); $this->reset(); } public function remove($node) { // A node is removed by resetting the ring so that it's recreated from // scratch, in order to reassign possible hashes with collisions to the // right node according to the order in which they were added in the // first place. for ($i = 0; $i < count($this->_nodes); ++$i) { if ($this->_nodes[$i]['object'] === $node) { array_splice($this->_nodes, $i, 1); $this->reset(); break; } } } private function reset() { unset($this->_ring); unset($this->_ringKeys); unset($this->_ringKeysCount); } private function isInitialized() { return isset($this->_ringKeys); } private function computeTotalWeight() { $totalWeight = 0; foreach ($this->_nodes as $node) { $totalWeight += $node['weight']; } return $totalWeight; } private function initialize() { if ($this->isInitialized()) { return; } if (count($this->_nodes) === 0) { throw new EmptyRingException('Cannot initialize empty hashring'); } $this->_ring = array(); $totalWeight = $this->computeTotalWeight(); $nodesCount = count($this->_nodes); foreach ($this->_nodes as $node) { $weightRatio = $node['weight'] / $totalWeight; $this->addNodeToRing($this->_ring, $node, $nodesCount, $this->_replicas, $weightRatio); } ksort($this->_ring, SORT_NUMERIC); $this->_ringKeys = array_keys($this->_ring); $this->_ringKeysCount = count($this->_ringKeys); } protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) { $nodeObject = $node['object']; $nodeHash = (string) $nodeObject; $replicas = (int) round($weightRatio * $totalNodes * $replicas); for ($i = 0; $i < $replicas; $i++) { $key = crc32("$nodeHash:$i"); $ring[$key] = $nodeObject; } } public function generateKey($value) { return crc32($value); } public function get($key) { return $this->_ring[$this->getNodeKey($key)]; } private function getNodeKey($key) { $this->initialize(); $ringKeys = $this->_ringKeys; $upper = $this->_ringKeysCount - 1; $lower = 0; while ($lower <= $upper) { $index = ($lower + $upper) >> 1; $item = $ringKeys[$index]; if ($item > $key) { $upper = $index - 1; } else if ($item < $key) { $lower = $index + 1; } else { return $item; } } return $ringKeys[$this->wrapAroundStrategy($upper, $lower, $this->_ringKeysCount)]; } protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) { // Binary search for the last item in _ringkeys with a value less or // equal to the key. If no such item exists, return the last item. return $upper >= 0 ? $upper : $ringKeysCount - 1; } } shouldResetConnection()) { $connection = $exception->getConnection(); if ($connection->isConnected()) { $connection->disconnect(); } } throw $exception; } public static function filterArrayArguments(Array $arguments) { if (count($arguments) === 1 && is_array($arguments[0])) { return $arguments[0]; } return $arguments; } } _current; } public function key() { return $this->_position; } public function next() { if (++$this->_position < $this->_replySize) { $this->_current = $this->getValue(); } return $this->_position; } public function valid() { return $this->_position < $this->_replySize; } public function count() { // Use count if you want to get the size of the current multi-bulk // response without using iterator_count (which actually consumes our // iterator to calculate the size, and we cannot perform a rewind) return $this->_replySize; } protected abstract function getValue(); } _connection = $connection; $this->_position = 0; $this->_current = $size > 0 ? $this->getValue() : null; $this->_replySize = $size; } public function __destruct() { // When the iterator is garbage-collected (e.g. it goes out of the // scope of a foreach) but it has not reached its end, we must sync // the client with the queued elements that have not been read from // the connection with the server. $this->sync(); } public function sync($drop = false) { if ($drop == true) { if ($this->valid()) { $this->_position = $this->_replySize; $this->_connection->disconnect(); } } else { while ($this->valid()) { $this->next(); } } } protected function getValue() { return $this->_connection->read(); } } _iterator = $iterator; $this->_position = 0; $this->_current = $virtualSize > 0 ? $this->getValue() : null; $this->_replySize = $virtualSize; } public function __destruct() { $this->_iterator->sync(); } protected function getValue() { $k = $this->_iterator->current(); $this->_iterator->next(); $v = $this->_iterator->current(); $this->_iterator->next(); return array($k, $v); } } checkCapabilities($client); $this->_options = $options ?: array(); $this->_client = $client; $this->reset(); } private function checkCapabilities(Client $client) { if (Helpers::isCluster($client->getConnection())) { throw new ClientException( 'Cannot initialize a MULTI/EXEC context over a cluster of connections' ); } $profile = $client->getProfile(); if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) { throw new ClientException( 'The current profile does not support MULTI, EXEC and DISCARD commands' ); } $this->_supportsWatch = $profile->supportsCommands(array('watch', 'unwatch')); } private function isWatchSupported() { if ($this->_supportsWatch === false) { throw new ClientException( 'The current profile does not support WATCH and UNWATCH commands' ); } } private function reset() { $this->_initialized = false; $this->_discarded = false; $this->_checkAndSet = false; $this->_insideBlock = false; $this->_watchedKeys = false; $this->_commands = array(); } private function initialize() { if ($this->_initialized === true) { return; } $options = $this->_options; $this->_checkAndSet = isset($options['cas']) && $options['cas']; if (isset($options['watch'])) { $this->watch($options['watch']); } if (!$this->_checkAndSet || ($this->_discarded && $this->_checkAndSet)) { $this->_client->multi(); if ($this->_discarded) { $this->_checkAndSet = false; } } $this->_initialized = true; $this->_discarded = false; } public function __call($method, $arguments) { $this->initialize(); $client = $this->_client; if ($this->_checkAndSet) { return call_user_func_array(array($client, $method), $arguments); } $command = $client->createCommand($method, $arguments); $response = $client->executeCommand($command); if (!$response instanceof ResponseQueued) { $this->onProtocolError('The server did not respond with a QUEUED status reply'); } $this->_commands[] = $command; return $this; } public function watch($keys) { $this->isWatchSupported(); $this->_watchedKeys = true; if ($this->_initialized && !$this->_checkAndSet) { throw new ClientException('WATCH inside MULTI is not allowed'); } return $this->_client->watch($keys); } public function multi() { if ($this->_initialized && $this->_checkAndSet) { $this->_checkAndSet = false; $this->_client->multi(); return $this; } $this->initialize(); return $this; } public function unwatch() { $this->isWatchSupported(); $this->_watchedKeys = false; $this->_client->unwatch(); return $this; } public function discard() { if ($this->_initialized === true) { $command = $this->_checkAndSet ? 'unwatch' : 'discard'; $this->_client->$command(); $this->reset(); $this->_discarded = true; } return $this; } public function exec() { return $this->execute(); } private function checkBeforeExecution($block) { if ($this->_insideBlock === true) { throw new ClientException( "Cannot invoke 'execute' or 'exec' inside an active client transaction block" ); } if ($block) { if (!is_callable($block)) { throw new \InvalidArgumentException( 'Argument passed must be a callable object' ); } if (count($this->_commands) > 0) { $this->discard(); throw new ClientException( 'Cannot execute a transaction block after using fluent interface' ); } } if (isset($this->_options['retry']) && !isset($block)) { $this->discard(); throw new \InvalidArgumentException( 'Automatic retries can be used only when a transaction block is provided' ); } } public function execute($block = null) { $this->checkBeforeExecution($block); $reply = null; $returnValues = array(); $attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0; do { $blockException = null; if ($block !== null) { $this->_insideBlock = true; try { $block($this); } catch (CommunicationException $exception) { $blockException = $exception; } catch (ServerException $exception) { $blockException = $exception; } catch (\Exception $exception) { $blockException = $exception; $this->discard(); } $this->_insideBlock = false; if ($blockException !== null) { throw $blockException; } } if (count($this->_commands) === 0) { if ($this->_watchedKeys) { $this->discard(); return; } return; } $reply = $this->_client->exec(); if ($reply === null) { if ($attemptsLeft === 0) { throw new AbortedMultiExec( 'The current transaction has been aborted by the server' ); } $this->reset(); if (isset($this->_options['on_retry']) && is_callable($this->_options['on_retry'])) { call_user_func($this->_options['on_retry'], $this, $attemptsLeft); } continue; } break; } while ($attemptsLeft-- > 0); $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply; $sizeofReplies = count($execReply); $commands = &$this->_commands; if ($sizeofReplies !== count($commands)) { $this->onProtocolError('Unexpected number of responses for a MultiExecContext'); } for ($i = 0; $i < $sizeofReplies; $i++) { $returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof \Iterator ? iterator_to_array($execReply[$i]) : $execReply[$i] ); unset($commands[$i]); } return $returnValues; } private function onProtocolError($message) { // Since a MULTI/EXEC block cannot be initialized over a clustered // connection, we can safely assume that Predis\Client::getConnection() // will always return an instance of Predis\Network\IConnectionSingle. Helpers::onCommunicationException(new ProtocolException( $this->_client->getConnection(), $message )); } } setProtocol($protocol ?: new TextProtocol()); parent::__construct($parameters); } protected function initializeProtocol(IConnectionParameters $parameters) { $this->_protocol->setOption('throw_errors', $parameters->throw_errors); $this->_protocol->setOption('iterable_multibulk', $parameters->iterable_multibulk); } public function setProtocol(IProtocolProcessor $protocol) { if ($protocol === null) { throw new \InvalidArgumentException("The protocol instance cannot be a null value"); } $this->_protocol = $protocol; } public function getProtocol() { return $this->_protocol; } public function writeBytes($buffer) { parent::writeBytes($buffer); } public function readBytes($length) { if ($length <= 0) { throw new \InvalidArgumentException('Length parameter must be greater than 0'); } $socket = $this->getResource(); $value = ''; do { $chunk = fread($socket, $length); if ($chunk === false || $chunk === '') { $this->onConnectionError('Error while reading bytes from the server'); } $value .= $chunk; } while (($length -= strlen($chunk)) > 0); return $value; } public function readLine() { $socket = $this->getResource(); $value = ''; do { $chunk = fgets($socket); if ($chunk === false || $chunk === '') { $this->onConnectionError('Error while reading line from the server'); } $value .= $chunk; } while (substr($value, -2) !== "\r\n"); return substr($value, 0, -2); } public function writeCommand(ICommand $command) { $this->_protocol->write($this, $command); } public function read() { return $this->_protocol->read($this); } } _initCmds = array(); $this->_params = $this->checkParameters($parameters); $this->initializeProtocol($parameters); } public function __destruct() { $this->disconnect(); } protected function checkParameters(IConnectionParameters $parameters) { switch ($parameters->scheme) { case 'unix': $pathToSocket = $parameters->path; if (!isset($pathToSocket)) { throw new InvalidArgumentException('Missing UNIX domain socket path'); } if (!file_exists($pathToSocket)) { throw new InvalidArgumentException("Could not find $pathToSocket"); } case 'tcp': return $parameters; default: throw new InvalidArgumentException("Invalid scheme: {$parameters->scheme}"); } return $parameters; } protected function initializeProtocol(IConnectionParameters $parameters) { // NOOP } protected abstract function createResource(); public function isConnected() { return isset($this->_resource); } public function connect() { if ($this->isConnected()) { throw new ClientException('Connection already estabilished'); } $this->_resource = $this->createResource(); } public function disconnect() { unset($this->_resource); } public function pushInitCommand(ICommand $command) { $this->_initCmds[] = $command; } public function executeCommand(ICommand $command) { $this->writeCommand($command); return $this->readResponse($command); } public function readResponse(ICommand $command) { $reply = $this->read(); if (isset($reply->skipParse)) { return $reply; } return $command->parseResponse($reply); } protected function onConnectionError($message, $code = null) { Helpers::onCommunicationException( new ConnectionException($this, $message, $code) ); } protected function onProtocolError($message) { Helpers::onCommunicationException( new ProtocolException($this, $message) ); } protected function onInvalidOption($option, $parameters = null) { $message = "Invalid option: $option"; if (isset($parameters)) { $message .= " [$parameters]"; } throw new InvalidArgumentException($message); } public function getResource() { if (isset($this->_resource)) { return $this->_resource; } $this->connect(); return $this->_resource; } public function getParameters() { return $this->_params; } protected function getIdentifier() { if ($this->_params->scheme === 'unix') { return $this->_params->path; } return "{$this->_params->host}:{$this->_params->port}"; } public function __toString() { if (!isset($this->_cachedId)) { $this->_cachedId = $this->getIdentifier(); } return $this->_cachedId; } } _pool = array(); $this->_distributor = $distributor ?: new Distribution\HashRing(); } public function isConnected() { foreach ($this->_pool as $connection) { if ($connection->isConnected()) { return true; } } return false; } public function connect() { foreach ($this->_pool as $connection) { $connection->connect(); } } public function disconnect() { foreach ($this->_pool as $connection) { $connection->disconnect(); } } public function add(IConnectionSingle $connection) { $parameters = $connection->getParameters(); if (isset($parameters->alias)) { $this->_pool[$parameters->alias] = $connection; } else { $this->_pool[] = $connection; } $this->_distributor->add($connection, $parameters->weight); } public function getConnection(ICommand $command) { $cmdHash = $command->getHash($this->_distributor); if (isset($cmdHash)) { return $this->_distributor->get($cmdHash); } throw new ClientException( sprintf("Cannot send '%s' commands to a cluster of connections", $command->getId()) ); } public function getConnectionById($id = null) { $alias = $id ?: 0; return isset($this->_pool[$alias]) ? $this->_pool[$alias] : null; } public function getIterator() { return new \ArrayIterator($this->_pool); } public function writeCommand(ICommand $command) { $this->getConnection($command)->writeCommand($command); } public function readResponse(ICommand $command) { return $this->getConnection($command)->readResponse($command); } public function executeCommand(ICommand $command) { return $this->getConnection($command)->executeCommand($command); } } _reader); parent::__destruct(); } protected function checkParameters(IConnectionParameters $parameters) { if (isset($parameters->iterable_multibulk)) { $this->onInvalidOption('iterable_multibulk', $parameters); } if (isset($parameters->connection_persistent)) { $this->onInvalidOption('connection_persistent', $parameters); } return parent::checkParameters($parameters); } private function initializeReader($throw_errors = true) { if (!function_exists('phpiredis_reader_create')) { throw new ClientException( 'The phpiredis extension must be loaded in order to be able to ' . 'use this connection class' ); } $reader = phpiredis_reader_create(); phpiredis_reader_set_status_handler($reader, $this->getStatusHandler()); phpiredis_reader_set_error_handler($reader, $this->getErrorHandler($throw_errors)); $this->_reader = $reader; } protected function initializeProtocol(IConnectionParameters $parameters) { $this->initializeReader($parameters->throw_errors); } private function getStatusHandler() { return function($payload) { switch ($payload) { case 'OK': return true; case 'QUEUED': return new ResponseQueued(); default: return $payload; } }; } private function getErrorHandler($throwErrors = true) { if ($throwErrors) { return function($errorMessage) { throw new ServerException(substr($errorMessage, 4)); }; } return function($errorMessage) { return new ResponseError(substr($errorMessage, 4)); }; } private function emitSocketError() { $errno = socket_last_error(); $errstr = socket_strerror($errno); $this->disconnect(); $this->onConnectionError(trim($errstr), $errno); } protected function createResource() { $parameters = $this->_params; $initializer = array($this, "{$parameters->scheme}SocketInitializer"); $socket = call_user_func($initializer, $parameters); $this->setSocketOptions($socket, $parameters); return $socket; } private function tcpSocketInitializer(IConnectionParameters $parameters) { $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if (!is_resource($socket)) { $this->emitSocketError(); } return $socket; } private function unixSocketInitializer(IConnectionParameters $parameters) { $socket = @socket_create(AF_UNIX, SOCK_STREAM, 0); if (!is_resource($socket)) { $this->emitSocketError(); } return $socket; } private function setSocketOptions($socket, IConnectionParameters $parameters) { if ($parameters->scheme !== 'tcp') { return; } if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) { $this->emitSocketError(); } if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) { $this->emitSocketError(); } if (isset($parameters->read_write_timeout)) { $rwtimeout = $parameters->read_write_timeout; $timeoutSec = floor($rwtimeout); $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000; $timeout = array('sec' => $timeoutSec, 'usec' => $timeoutUsec); if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) { $this->emitSocketError(); } if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) { $this->emitSocketError(); } } } private function getAddress(IConnectionParameters $parameters) { if ($parameters->scheme === 'unix') { return $parameters->path; } $host = $parameters->host; if (ip2long($host) === false) { if (($address = gethostbyname($host)) === $host) { $this->onConnectionError("Cannot resolve the address of $host"); } return $address; } return $host; } private function connectWithTimeout(IConnectionParameters $parameters) { $host = self::getAddress($parameters); $socket = $this->getResource(); socket_set_nonblock($socket); if (@socket_connect($socket, $host, $parameters->port) === false) { $error = socket_last_error(); if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) { $this->emitSocketError(); } } socket_set_block($socket); $null = null; $selectable = array($socket); $timeout = $parameters->connection_timeout; $timeoutSecs = floor($timeout); $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000; $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs); if ($selected === 2) { $this->onConnectionError('Connection refused', SOCKET_ECONNREFUSED); } if ($selected === 0) { $this->onConnectionError('Connection timed out', SOCKET_ETIMEDOUT); } if ($selected === false) { $this->emitSocketError(); } } public function connect() { parent::connect(); $this->connectWithTimeout($this->_params); if (count($this->_initCmds) > 0) { $this->sendInitializationCommands(); } } public function disconnect() { if ($this->isConnected()) { socket_close($this->getResource()); parent::disconnect(); } } private function sendInitializationCommands() { foreach ($this->_initCmds as $command) { $this->writeCommand($command); } foreach ($this->_initCmds as $command) { $this->readResponse($command); } } private function write($buffer) { $socket = $this->getResource(); while (($length = strlen($buffer)) > 0) { $written = socket_write($socket, $buffer, $length); if ($length === $written) { return; } if ($written === false) { $this->onConnectionError('Error while writing bytes to the server'); } $buffer = substr($buffer, $written); } } public function read() { $socket = $this->getResource(); $reader = $this->_reader; while (($state = phpiredis_reader_get_state($reader)) === PHPIREDIS_READER_STATE_INCOMPLETE) { if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '') { $this->emitSocketError(); } phpiredis_reader_feed($reader, $buffer); } if ($state === PHPIREDIS_READER_STATE_COMPLETE) { return phpiredis_reader_get_reply($reader); } else { $this->onProtocolError(phpiredis_reader_get_error($reader)); } } public function writeCommand(ICommand $command) { $cmdargs = $command->getArguments(); array_unshift($cmdargs, $command->getId()); $this->write(phpiredis_format_command($cmdargs)); } } _params->connection_persistent) { $this->disconnect(); } } protected function initializeProtocol(IConnectionParameters $parameters) { $this->_throwErrors = $parameters->throw_errors; $this->_mbiterable = $parameters->iterable_multibulk; } protected function createResource() { $parameters = $this->_params; $initializer = "{$parameters->scheme}StreamInitializer"; return $this->$initializer($parameters); } private function tcpStreamInitializer(IConnectionParameters $parameters) { $uri = "tcp://{$parameters->host}:{$parameters->port}/"; $flags = STREAM_CLIENT_CONNECT; if ($parameters->connection_async) { $flags |= STREAM_CLIENT_ASYNC_CONNECT; } if ($parameters->connection_persistent) { $flags |= STREAM_CLIENT_PERSISTENT; } $resource = @stream_socket_client( $uri, $errno, $errstr, $parameters->connection_timeout, $flags ); if (!$resource) { $this->onConnectionError(trim($errstr), $errno); } if (isset($parameters->read_write_timeout)) { $rwtimeout = $parameters->read_write_timeout; $rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1; $timeoutSeconds = floor($rwtimeout); $timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000; stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds); } return $resource; } private function unixStreamInitializer(IConnectionParameters $parameters) { $uri = "unix://{$parameters->path}"; $flags = STREAM_CLIENT_CONNECT; if ($parameters->connection_persistent) { $flags |= STREAM_CLIENT_PERSISTENT; } $resource = @stream_socket_client( $uri, $errno, $errstr, $parameters->connection_timeout, $flags ); if (!$resource) { $this->onConnectionError(trim($errstr), $errno); } return $resource; } public function connect() { parent::connect(); if (count($this->_initCmds) > 0){ $this->sendInitializationCommands(); } } public function disconnect() { if ($this->isConnected()) { fclose($this->getResource()); parent::disconnect(); } } private function sendInitializationCommands() { foreach ($this->_initCmds as $command) { $this->writeCommand($command); } foreach ($this->_initCmds as $command) { $this->readResponse($command); } } protected function writeBytes($buffer) { $socket = $this->getResource(); while (($length = strlen($buffer)) > 0) { $written = fwrite($socket, $buffer); if ($length === $written) { return; } if ($written === false || $written === 0) { $this->onConnectionError('Error while writing bytes to the server'); } $value = substr($buffer, $written); } } public function read() { $socket = $this->getResource(); $chunk = fgets($socket); if ($chunk === false || $chunk === '') { $this->onConnectionError('Error while reading line from the server'); } $prefix = $chunk[0]; $payload = substr($chunk, 1, -2); switch ($prefix) { case '+': // inline switch ($payload) { case 'OK': return true; case 'QUEUED': return new ResponseQueued(); default: return $payload; } case '$': // bulk $size = (int) $payload; if ($size === -1) { return null; } $bulkData = ''; $bytesLeft = ($size += 2); do { $chunk = fread($socket, min($bytesLeft, 4096)); if ($chunk === false || $chunk === '') { $this->onConnectionError( 'Error while reading bytes from the server' ); } $bulkData .= $chunk; $bytesLeft = $size - strlen($bulkData); } while ($bytesLeft > 0); return substr($bulkData, 0, -2); case '*': // multi bulk $count = (int) $payload; if ($count === -1) { return null; } if ($this->_mbiterable === true) { return new MultiBulkResponseSimple($this, $count); } $multibulk = array(); for ($i = 0; $i < $count; $i++) { $multibulk[$i] = $this->read(); } return $multibulk; case ':': // integer return (int) $payload; case '-': // error $errorMessage = substr($payload, 4); if ($this->_throwErrors) { throw new ServerException($errorMessage); } return new ResponseError($errorMessage); default: $this->onProtocolError("Unknown prefix: '$prefix'"); } } public function writeCommand(ICommand $command) { $commandId = $command->getId(); $arguments = $command->getArguments(); $cmdlen = strlen($commandId); $reqlen = count($arguments) + 1; $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandId}\r\n"; for ($i = 0; $i < $reqlen - 1; $i++) { $argument = $arguments[$i]; $arglen = strlen($argument); $buffer .= "\${$arglen}\r\n{$argument}\r\n"; } $this->writeBytes($buffer); } } checkClass($value); } } private function checkClass($class) { $reflection = new \ReflectionClass($class); if (!$reflection->isSubclassOf(self::CLUSTER_INTERFACE)) { throw new ClientException( "The class $class is not a valid cluster connection" ); } return $class; } public function getDefault() { return self::CLUSTER_PREDIS; } } isSubclassOf('\Predis\Distribution\IDistributionStrategy')) { return new $value; } } throw new \InvalidArgumentException('Invalid value for key distribution'); } public function getDefault() { return new HashRing(); } } _validate = $this->filterCallable($options, 'validate'); $this->_default = $this->filterCallable($options, 'default'); } private function filterCallable($options, $key) { if (!isset($options[$key])) { return; } $callable = $options[$key]; if (is_callable($callable)) { return $callable; } throw new \InvalidArgumentException("The parameter $key must be callable"); } public function validate($value) { if (isset($value)) { if ($this->_validate === null) { return $value; } $validator = $this->_validate; return $validator($value); } } public function getDefault() { if (!isset($this->_default)) { return; } $default = $this->_default; return $default(); } public function __invoke($value) { if (isset($value)) { return $this->validate($value); } return $this->getDefault(); } } validate($value); } return $this->getDefault(); } } getConnection($command); if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) { continue; } try { $cmdConnection->writeCommand($command); } catch (CommunicationException $exception) { $connectionExceptions[spl_object_hash($cmdConnection)] = $exception; } } for ($i = 0; $i < $sizeofPipe; $i++) { $command = $commands[$i]; unset($commands[$i]); $cmdConnection = $connection->getConnection($command); $connectionObjectHash = spl_object_hash($cmdConnection); if (isset($connectionExceptions[$connectionObjectHash])) { $values[] = $connectionExceptions[$connectionObjectHash]; continue; } try { $response = $cmdConnection->readResponse($command); $values[] = ($response instanceof \Iterator ? iterator_to_array($response) : $response ); } catch (ServerException $exception) { $values[] = $exception->toResponseError(); } catch (CommunicationException $exception) { $values[] = $exception; $connectionExceptions[$connectionObjectHash] = $exception; } } return $values; } } writeCommand($command); } catch (CommunicationException $exception) { return array_fill(0, $sizeofPipe, $exception); } } for ($i = 0; $i < $sizeofPipe; $i++) { $command = $commands[$i]; unset($commands[$i]); try { $response = $connection->readResponse($command); $values[] = ($response instanceof \Iterator ? iterator_to_array($response) : $response ); } catch (ServerException $exception) { $values[] = $exception->toResponseError(); } catch (CommunicationException $exception) { $toAdd = count($commands) - count($values); $values = array_merge($values, array_fill(0, $toAdd, $exception)); break; } } return $values; } } writeCommand($command); } try { for ($i = 0; $i < $sizeofPipe; $i++) { $response = $connection->readResponse($commands[$i]); $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response; unset($commands[$i]); } } catch (ServerException $exception) { // Force disconnection to prevent protocol desynchronization. $connection->disconnect(); throw $exception; } return $values; } } _client = $client; $this->_executor = $executor ?: new Pipeline\StandardExecutor(); $this->_pipelineBuffer = array(); $this->_returnValues = array(); } public function __call($method, $arguments) { $command = $this->_client->createCommand($method, $arguments); $this->recordCommand($command); return $this; } protected function recordCommand(ICommand $command) { $this->_pipelineBuffer[] = $command; } public function flushPipeline() { if (count($this->_pipelineBuffer) > 0) { $connection = $this->_client->getConnection(); $this->_returnValues = array_merge( $this->_returnValues, $this->_executor->execute($connection, $this->_pipelineBuffer) ); $this->_pipelineBuffer = array(); } return $this; } private function setRunning($bool) { if ($bool === true && $this->_running === true) { throw new ClientException("This pipeline is already opened"); } $this->_running = $bool; } public function execute($block = null) { if ($block && !is_callable($block)) { throw new \InvalidArgumentException('Argument passed must be a callable object'); } $this->setRunning(true); $pipelineBlockException = null; try { if ($block !== null) { $block($this); } $this->flushPipeline(); } catch (\Exception $exception) { $pipelineBlockException = $exception; } $this->setRunning(false); if ($pipelineBlockException !== null) { throw $pipelineBlockException; } return $this->_returnValues; } } _registeredCommands = $this->getSupportedCommands(); } protected abstract function getSupportedCommands(); public static function getDefault() { return self::get('default'); } public static function getDevelopment() { return self::get('dev'); } private static function getDefaultProfiles() { return array( '1.2' => '\Predis\Profiles\ServerVersion12', '2.0' => '\Predis\Profiles\ServerVersion20', '2.2' => '\Predis\Profiles\ServerVersion22', 'default' => '\Predis\Profiles\ServerVersion22', 'dev' => '\Predis\Profiles\ServerVersionNext', ); } public static function define($alias, $profileClass) { if (!isset(self::$_profiles)) { self::$_profiles = self::getDefaultProfiles(); } $profileReflection = new \ReflectionClass($profileClass); if (!$profileReflection->isSubclassOf('\Predis\Profiles\IServerProfile')) { throw new ClientException( "Cannot register '$profileClass' as it is not a valid profile class" ); } self::$_profiles[$alias] = $profileClass; } public static function get($version) { if (!isset(self::$_profiles)) { self::$_profiles = self::getDefaultProfiles(); } if (!isset(self::$_profiles[$version])) { throw new ClientException("Unknown server profile: $version"); } $profile = self::$_profiles[$version]; return new $profile(); } public function supportsCommands(Array $commands) { foreach ($commands as $command) { if ($this->supportsCommand($command) === false) { return false; } } return true; } public function supportsCommand($command) { return isset($this->_registeredCommands[$command]); } public function createCommand($method, $arguments = array()) { if (!isset($this->_registeredCommands[$method])) { throw new ClientException("'$method' is not a registered Redis command"); } if (isset($this->_preprocessor)) { $this->_preprocessor->process($method, $arguments); } $commandClass = $this->_registeredCommands[$method]; $command = new $commandClass(); $command->setArguments($arguments); return $command; } public function defineCommands(Array $commands) { foreach ($commands as $alias => $command) { $this->defineCommand($alias, $command); } } public function defineCommand($alias, $command) { $commandReflection = new \ReflectionClass($command); if (!$commandReflection->isSubclassOf('\Predis\Commands\ICommand')) { throw new ClientException("Cannot register '$command' as it is not a valid Redis command"); } $this->_registeredCommands[$alias] = $command; } public function setPreprocessor(ICommandPreprocessor $preprocessor) { if (!isset($preprocessor)) { unset($this->_preprocessor); return; } $this->_preprocessor = $preprocessor; } public function getPreprocessor() { return $this->_preprocessor; } public function __toString() { return $this->getVersion(); } } '\Predis\Commands\Ping', 'echo' => '\Predis\Commands\DoEcho', 'auth' => '\Predis\Commands\Auth', /* connection handling */ 'quit' => '\Predis\Commands\Quit', /* commands operating on string values */ 'set' => '\Predis\Commands\Set', 'setnx' => '\Predis\Commands\SetPreserve', 'mset' => '\Predis\Commands\SetMultiple', 'msetnx' => '\Predis\Commands\SetMultiplePreserve', 'get' => '\Predis\Commands\Get', 'mget' => '\Predis\Commands\GetMultiple', 'getset' => '\Predis\Commands\GetSet', 'incr' => '\Predis\Commands\Increment', 'incrby' => '\Predis\Commands\IncrementBy', 'decr' => '\Predis\Commands\Decrement', 'decrby' => '\Predis\Commands\DecrementBy', 'exists' => '\Predis\Commands\Exists', 'del' => '\Predis\Commands\Delete', 'type' => '\Predis\Commands\Type', /* commands operating on the key space */ 'keys' => '\Predis\Commands\KeysV12x', 'randomkey' => '\Predis\Commands\RandomKey', 'rename' => '\Predis\Commands\Rename', 'renamenx' => '\Predis\Commands\RenamePreserve', 'expire' => '\Predis\Commands\Expire', 'expireat' => '\Predis\Commands\ExpireAt', 'dbsize' => '\Predis\Commands\DatabaseSize', 'ttl' => '\Predis\Commands\TimeToLive', /* commands operating on lists */ 'rpush' => '\Predis\Commands\ListPushTail', 'lpush' => '\Predis\Commands\ListPushHead', 'llen' => '\Predis\Commands\ListLength', 'lrange' => '\Predis\Commands\ListRange', 'ltrim' => '\Predis\Commands\ListTrim', 'lindex' => '\Predis\Commands\ListIndex', 'lset' => '\Predis\Commands\ListSet', 'lrem' => '\Predis\Commands\ListRemove', 'lpop' => '\Predis\Commands\ListPopFirst', 'rpop' => '\Predis\Commands\ListPopLast', 'rpoplpush' => '\Predis\Commands\ListPopLastPushHead', /* commands operating on sets */ 'sadd' => '\Predis\Commands\SetAdd', 'srem' => '\Predis\Commands\SetRemove', 'spop' => '\Predis\Commands\SetPop', 'smove' => '\Predis\Commands\SetMove', 'scard' => '\Predis\Commands\SetCardinality', 'sismember' => '\Predis\Commands\SetIsMember', 'sinter' => '\Predis\Commands\SetIntersection', 'sinterstore' => '\Predis\Commands\SetIntersectionStore', 'sunion' => '\Predis\Commands\SetUnion', 'sunionstore' => '\Predis\Commands\SetUnionStore', 'sdiff' => '\Predis\Commands\SetDifference', 'sdiffstore' => '\Predis\Commands\SetDifferenceStore', 'smembers' => '\Predis\Commands\SetMembers', 'srandmember' => '\Predis\Commands\SetRandomMember', /* commands operating on sorted sets */ 'zadd' => '\Predis\Commands\ZSetAdd', 'zincrby' => '\Predis\Commands\ZSetIncrementBy', 'zrem' => '\Predis\Commands\ZSetRemove', 'zrange' => '\Predis\Commands\ZSetRange', 'zrevrange' => '\Predis\Commands\ZSetReverseRange', 'zrangebyscore' => '\Predis\Commands\ZSetRangeByScore', 'zcard' => '\Predis\Commands\ZSetCardinality', 'zscore' => '\Predis\Commands\ZSetScore', 'zremrangebyscore' => '\Predis\Commands\ZSetRemoveRangeByScore', /* multiple databases handling commands */ 'select' => '\Predis\Commands\SelectDatabase', 'move' => '\Predis\Commands\MoveKey', 'flushdb' => '\Predis\Commands\FlushDatabase', 'flushall' => '\Predis\Commands\FlushAll', /* sorting */ 'sort' => '\Predis\Commands\Sort', /* remote server control commands */ 'info' => '\Predis\Commands\Info', 'slaveof' => '\Predis\Commands\SlaveOf', /* persistence control commands */ 'save' => '\Predis\Commands\Save', 'bgsave' => '\Predis\Commands\BackgroundSave', 'lastsave' => '\Predis\Commands\LastSave', 'shutdown' => '\Predis\Commands\Shutdown', 'bgrewriteaof' => '\Predis\Commands\BackgroundRewriteAppendOnlyFile', ); } } '\Predis\Commands\Ping', 'echo' => '\Predis\Commands\DoEcho', 'auth' => '\Predis\Commands\Auth', /* connection handling */ 'quit' => '\Predis\Commands\Quit', /* commands operating on string values */ 'set' => '\Predis\Commands\Set', 'setnx' => '\Predis\Commands\SetPreserve', 'mset' => '\Predis\Commands\SetMultiple', 'msetnx' => '\Predis\Commands\SetMultiplePreserve', 'get' => '\Predis\Commands\Get', 'mget' => '\Predis\Commands\GetMultiple', 'getset' => '\Predis\Commands\GetSet', 'incr' => '\Predis\Commands\Increment', 'incrby' => '\Predis\Commands\IncrementBy', 'decr' => '\Predis\Commands\Decrement', 'decrby' => '\Predis\Commands\DecrementBy', 'exists' => '\Predis\Commands\Exists', 'del' => '\Predis\Commands\Delete', 'type' => '\Predis\Commands\Type', /* commands operating on the key space */ 'keys' => '\Predis\Commands\Keys', 'randomkey' => '\Predis\Commands\RandomKey', 'rename' => '\Predis\Commands\Rename', 'renamenx' => '\Predis\Commands\RenamePreserve', 'expire' => '\Predis\Commands\Expire', 'expireat' => '\Predis\Commands\ExpireAt', 'dbsize' => '\Predis\Commands\DatabaseSize', 'ttl' => '\Predis\Commands\TimeToLive', /* commands operating on lists */ 'rpush' => '\Predis\Commands\ListPushTail', 'lpush' => '\Predis\Commands\ListPushHead', 'llen' => '\Predis\Commands\ListLength', 'lrange' => '\Predis\Commands\ListRange', 'ltrim' => '\Predis\Commands\ListTrim', 'lindex' => '\Predis\Commands\ListIndex', 'lset' => '\Predis\Commands\ListSet', 'lrem' => '\Predis\Commands\ListRemove', 'lpop' => '\Predis\Commands\ListPopFirst', 'rpop' => '\Predis\Commands\ListPopLast', 'rpoplpush' => '\Predis\Commands\ListPopLastPushHead', /* commands operating on sets */ 'sadd' => '\Predis\Commands\SetAdd', 'srem' => '\Predis\Commands\SetRemove', 'spop' => '\Predis\Commands\SetPop', 'smove' => '\Predis\Commands\SetMove', 'scard' => '\Predis\Commands\SetCardinality', 'sismember' => '\Predis\Commands\SetIsMember', 'sinter' => '\Predis\Commands\SetIntersection', 'sinterstore' => '\Predis\Commands\SetIntersectionStore', 'sunion' => '\Predis\Commands\SetUnion', 'sunionstore' => '\Predis\Commands\SetUnionStore', 'sdiff' => '\Predis\Commands\SetDifference', 'sdiffstore' => '\Predis\Commands\SetDifferenceStore', 'smembers' => '\Predis\Commands\SetMembers', 'srandmember' => '\Predis\Commands\SetRandomMember', /* commands operating on sorted sets */ 'zadd' => '\Predis\Commands\ZSetAdd', 'zincrby' => '\Predis\Commands\ZSetIncrementBy', 'zrem' => '\Predis\Commands\ZSetRemove', 'zrange' => '\Predis\Commands\ZSetRange', 'zrevrange' => '\Predis\Commands\ZSetReverseRange', 'zrangebyscore' => '\Predis\Commands\ZSetRangeByScore', 'zcard' => '\Predis\Commands\ZSetCardinality', 'zscore' => '\Predis\Commands\ZSetScore', 'zremrangebyscore' => '\Predis\Commands\ZSetRemoveRangeByScore', /* multiple databases handling commands */ 'select' => '\Predis\Commands\SelectDatabase', 'move' => '\Predis\Commands\MoveKey', 'flushdb' => '\Predis\Commands\FlushDatabase', 'flushall' => '\Predis\Commands\FlushAll', /* sorting */ 'sort' => '\Predis\Commands\Sort', /* remote server control commands */ 'info' => '\Predis\Commands\Info', 'slaveof' => '\Predis\Commands\SlaveOf', /* persistence control commands */ 'save' => '\Predis\Commands\Save', 'bgsave' => '\Predis\Commands\BackgroundSave', 'lastsave' => '\Predis\Commands\LastSave', 'shutdown' => '\Predis\Commands\Shutdown', 'bgrewriteaof' => '\Predis\Commands\BackgroundRewriteAppendOnlyFile', /* ---------------- Redis 2.0 ---------------- */ /* transactions */ 'multi' => '\Predis\Commands\Multi', 'exec' => '\Predis\Commands\Exec', 'discard' => '\Predis\Commands\Discard', /* commands operating on string values */ 'setex' => '\Predis\Commands\SetExpire', 'append' => '\Predis\Commands\Append', 'substr' => '\Predis\Commands\Substr', /* commands operating on lists */ 'blpop' => '\Predis\Commands\ListPopFirstBlocking', 'brpop' => '\Predis\Commands\ListPopLastBlocking', /* commands operating on sorted sets */ 'zunionstore' => '\Predis\Commands\ZSetUnionStore', 'zinterstore' => '\Predis\Commands\ZSetIntersectionStore', 'zcount' => '\Predis\Commands\ZSetCount', 'zrank' => '\Predis\Commands\ZSetRank', 'zrevrank' => '\Predis\Commands\ZSetReverseRank', 'zremrangebyrank' => '\Predis\Commands\ZSetRemoveRangeByRank', /* commands operating on hashes */ 'hset' => '\Predis\Commands\HashSet', 'hsetnx' => '\Predis\Commands\HashSetPreserve', 'hmset' => '\Predis\Commands\HashSetMultiple', 'hincrby' => '\Predis\Commands\HashIncrementBy', 'hget' => '\Predis\Commands\HashGet', 'hmget' => '\Predis\Commands\HashGetMultiple', 'hdel' => '\Predis\Commands\HashDelete', 'hexists' => '\Predis\Commands\HashExists', 'hlen' => '\Predis\Commands\HashLength', 'hkeys' => '\Predis\Commands\HashKeys', 'hvals' => '\Predis\Commands\HashValues', 'hgetall' => '\Predis\Commands\HashGetAll', /* publish - subscribe */ 'subscribe' => '\Predis\Commands\Subscribe', 'unsubscribe' => '\Predis\Commands\Unsubscribe', 'psubscribe' => '\Predis\Commands\SubscribeByPattern', 'punsubscribe' => '\Predis\Commands\UnsubscribeByPattern', 'publish' => '\Predis\Commands\Publish', /* remote server control commands */ 'config' => '\Predis\Commands\Config', ); } } '\Predis\Commands\Ping', 'echo' => '\Predis\Commands\DoEcho', 'auth' => '\Predis\Commands\Auth', /* connection handling */ 'quit' => '\Predis\Commands\Quit', /* commands operating on string values */ 'set' => '\Predis\Commands\Set', 'setnx' => '\Predis\Commands\SetPreserve', 'mset' => '\Predis\Commands\SetMultiple', 'msetnx' => '\Predis\Commands\SetMultiplePreserve', 'get' => '\Predis\Commands\Get', 'mget' => '\Predis\Commands\GetMultiple', 'getset' => '\Predis\Commands\GetSet', 'incr' => '\Predis\Commands\Increment', 'incrby' => '\Predis\Commands\IncrementBy', 'decr' => '\Predis\Commands\Decrement', 'decrby' => '\Predis\Commands\DecrementBy', 'exists' => '\Predis\Commands\Exists', 'del' => '\Predis\Commands\Delete', 'type' => '\Predis\Commands\Type', /* commands operating on the key space */ 'keys' => '\Predis\Commands\Keys', 'randomkey' => '\Predis\Commands\RandomKey', 'rename' => '\Predis\Commands\Rename', 'renamenx' => '\Predis\Commands\RenamePreserve', 'expire' => '\Predis\Commands\Expire', 'expireat' => '\Predis\Commands\ExpireAt', 'dbsize' => '\Predis\Commands\DatabaseSize', 'ttl' => '\Predis\Commands\TimeToLive', /* commands operating on lists */ 'rpush' => '\Predis\Commands\ListPushTail', 'lpush' => '\Predis\Commands\ListPushHead', 'llen' => '\Predis\Commands\ListLength', 'lrange' => '\Predis\Commands\ListRange', 'ltrim' => '\Predis\Commands\ListTrim', 'lindex' => '\Predis\Commands\ListIndex', 'lset' => '\Predis\Commands\ListSet', 'lrem' => '\Predis\Commands\ListRemove', 'lpop' => '\Predis\Commands\ListPopFirst', 'rpop' => '\Predis\Commands\ListPopLast', 'rpoplpush' => '\Predis\Commands\ListPopLastPushHead', /* commands operating on sets */ 'sadd' => '\Predis\Commands\SetAdd', 'srem' => '\Predis\Commands\SetRemove', 'spop' => '\Predis\Commands\SetPop', 'smove' => '\Predis\Commands\SetMove', 'scard' => '\Predis\Commands\SetCardinality', 'sismember' => '\Predis\Commands\SetIsMember', 'sinter' => '\Predis\Commands\SetIntersection', 'sinterstore' => '\Predis\Commands\SetIntersectionStore', 'sunion' => '\Predis\Commands\SetUnion', 'sunionstore' => '\Predis\Commands\SetUnionStore', 'sdiff' => '\Predis\Commands\SetDifference', 'sdiffstore' => '\Predis\Commands\SetDifferenceStore', 'smembers' => '\Predis\Commands\SetMembers', 'srandmember' => '\Predis\Commands\SetRandomMember', /* commands operating on sorted sets */ 'zadd' => '\Predis\Commands\ZSetAdd', 'zincrby' => '\Predis\Commands\ZSetIncrementBy', 'zrem' => '\Predis\Commands\ZSetRemove', 'zrange' => '\Predis\Commands\ZSetRange', 'zrevrange' => '\Predis\Commands\ZSetReverseRange', 'zrangebyscore' => '\Predis\Commands\ZSetRangeByScore', 'zcard' => '\Predis\Commands\ZSetCardinality', 'zscore' => '\Predis\Commands\ZSetScore', 'zremrangebyscore' => '\Predis\Commands\ZSetRemoveRangeByScore', /* multiple databases handling commands */ 'select' => '\Predis\Commands\SelectDatabase', 'move' => '\Predis\Commands\MoveKey', 'flushdb' => '\Predis\Commands\FlushDatabase', 'flushall' => '\Predis\Commands\FlushAll', /* sorting */ 'sort' => '\Predis\Commands\Sort', /* remote server control commands */ 'info' => '\Predis\Commands\Info', 'slaveof' => '\Predis\Commands\SlaveOf', /* persistence control commands */ 'save' => '\Predis\Commands\Save', 'bgsave' => '\Predis\Commands\BackgroundSave', 'lastsave' => '\Predis\Commands\LastSave', 'shutdown' => '\Predis\Commands\Shutdown', 'bgrewriteaof' => '\Predis\Commands\BackgroundRewriteAppendOnlyFile', /* ---------------- Redis 2.0 ---------------- */ /* transactions */ 'multi' => '\Predis\Commands\Multi', 'exec' => '\Predis\Commands\Exec', 'discard' => '\Predis\Commands\Discard', /* commands operating on string values */ 'setex' => '\Predis\Commands\SetExpire', 'append' => '\Predis\Commands\Append', 'substr' => '\Predis\Commands\Substr', /* commands operating on lists */ 'blpop' => '\Predis\Commands\ListPopFirstBlocking', 'brpop' => '\Predis\Commands\ListPopLastBlocking', /* commands operating on sorted sets */ 'zunionstore' => '\Predis\Commands\ZSetUnionStore', 'zinterstore' => '\Predis\Commands\ZSetIntersectionStore', 'zcount' => '\Predis\Commands\ZSetCount', 'zrank' => '\Predis\Commands\ZSetRank', 'zrevrank' => '\Predis\Commands\ZSetReverseRank', 'zremrangebyrank' => '\Predis\Commands\ZSetRemoveRangeByRank', /* commands operating on hashes */ 'hset' => '\Predis\Commands\HashSet', 'hsetnx' => '\Predis\Commands\HashSetPreserve', 'hmset' => '\Predis\Commands\HashSetMultiple', 'hincrby' => '\Predis\Commands\HashIncrementBy', 'hget' => '\Predis\Commands\HashGet', 'hmget' => '\Predis\Commands\HashGetMultiple', 'hdel' => '\Predis\Commands\HashDelete', 'hexists' => '\Predis\Commands\HashExists', 'hlen' => '\Predis\Commands\HashLength', 'hkeys' => '\Predis\Commands\HashKeys', 'hvals' => '\Predis\Commands\HashValues', 'hgetall' => '\Predis\Commands\HashGetAll', /* publish - subscribe */ 'subscribe' => '\Predis\Commands\Subscribe', 'unsubscribe' => '\Predis\Commands\Unsubscribe', 'psubscribe' => '\Predis\Commands\SubscribeByPattern', 'punsubscribe' => '\Predis\Commands\UnsubscribeByPattern', 'publish' => '\Predis\Commands\Publish', /* remote server control commands */ 'config' => '\Predis\Commands\Config', /* ---------------- Redis 2.2 ---------------- */ /* transactions */ 'watch' => '\Predis\Commands\Watch', 'unwatch' => '\Predis\Commands\Unwatch', /* commands operating on string values */ 'strlen' => '\Predis\Commands\Strlen', 'setrange' => '\Predis\Commands\SetRange', 'getrange' => '\Predis\Commands\GetRange', 'setbit' => '\Predis\Commands\SetBit', 'getbit' => '\Predis\Commands\GetBit', /* commands operating on the key space */ 'persist' => '\Predis\Commands\Persist', /* commands operating on lists */ 'rpushx' => '\Predis\Commands\ListPushTailX', 'lpushx' => '\Predis\Commands\ListPushHeadX', 'linsert' => '\Predis\Commands\ListInsert', 'brpoplpush' => '\Predis\Commands\ListPopLastPushHeadBlocking', /* commands operating on sorted sets */ 'zrevrangebyscore' => '\Predis\Commands\ZSetReverseRangeByScore', /* remote server control commands */ 'object' => '\Predis\Commands\DebugObject', ); } } '\Predis\Commands\ListPushTailV24x', 'lpush' => '\Predis\Commands\ListPushHeadV24x', /* commands operating on sets */ 'sadd' => '\Predis\Commands\SetAddV24x', /* commands operating on hashes */ 'hdel' => '\Predis\Commands\HashDeleteV24x', /* remote server control commands */ 'info' => '\Predis\Commands\InfoV24x', )); } } setSerializer(new TextCommandSerializer()); $this->setReader(new TextResponseReader()); if (count($options) > 0) { $this->initializeOptions($options); } } private function initializeOptions(Array $options) { foreach ($options as $k => $v) { $this->setOption($k, $v); } } public function setOption($option, $value) { switch ($option) { case 'iterable_multibulk': $handler = $value ? new ResponseMultiBulkStreamHandler() : new ResponseMultiBulkHandler(); $this->_reader->setHandler(TextProtocol::PREFIX_MULTI_BULK, $handler); break; case 'throw_errors': $handler = $value ? new ResponseErrorHandler() : new ResponseErrorSilentHandler(); $this->_reader->setHandler(TextProtocol::PREFIX_ERROR, $handler); break; default: throw new \InvalidArgumentException( "The option $option is not supported by the current protocol" ); } } public function serialize(ICommand $command) { return $this->_serializer->serialize($command); } public function write(IConnectionComposable $connection, ICommand $command) { $connection->writeBytes($this->_serializer->serialize($command)); } public function read(IConnectionComposable $connection) { return $this->_reader->read($connection); } public function setSerializer(ICommandSerializer $serializer) { $this->_serializer = $serializer; } public function getSerializer() { return $this->_serializer; } public function setReader(IResponseReader $reader) { $this->_reader = $reader; } public function getReader() { return $this->_reader; } } = 0) { return substr($connection->readBytes($length + 2), 0, -2); } if ($length == -1) { return null; } } } 0) { $handlersCache = array(); $reader = $connection->getProtocol()->getReader(); for ($i = 0; $i < $length; $i++) { $header = $connection->readLine(); $prefix = $header[0]; if (isset($handlersCache[$prefix])) { $handler = $handlersCache[$prefix]; } else { $handler = $reader->getHandler($prefix); $handlersCache[$prefix] = $handler; } $list[$i] = $handler->handle($connection, substr($header, 1)); } } return $list; } } getId(); $arguments = $command->getArguments(); $cmdlen = strlen($commandId); $reqlen = count($arguments) + 1; $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandId}\r\n"; for ($i = 0; $i < $reqlen - 1; $i++) { $argument = $arguments[$i]; $arglen = strlen($argument); $buffer .= "\${$arglen}\r\n{$argument}\r\n"; } return $buffer; } } _mbiterable = false; $this->_throwErrors = true; $this->_serializer = new TextCommandSerializer(); } public function write(IConnectionComposable $connection, ICommand $command) { $connection->writeBytes($this->_serializer->serialize($command)); } public function read(IConnectionComposable $connection) { $chunk = $connection->readLine(); $prefix = $chunk[0]; $payload = substr($chunk, 1); switch ($prefix) { case '+': // inline switch ($payload) { case 'OK': return true; case 'QUEUED': return new ResponseQueued(); default: return $payload; } case '$': // bulk $size = (int) $payload; if ($size === -1) { return null; } return substr($connection->readBytes($size + 2), 0, -2); case '*': // multi bulk $count = (int) $payload; if ($count === -1) { return null; } if ($this->_mbiterable == true) { return new MultiBulkResponseSimple($connection, $count); } $multibulk = array(); for ($i = 0; $i < $count; $i++) { $multibulk[$i] = $this->read($connection); } return $multibulk; case ':': // integer return (int) $payload; case '-': // error $errorMessage = substr($payload, 4); if ($this->_throwErrors) { throw new ServerException($errorMessage); } return new ResponseError($errorMessage); default: Helpers::onCommunicationException(new ProtocolException( $connection, "Unknown prefix: '$prefix'" )); } } public function setOption($option, $value) { switch ($option) { case 'iterable_multibulk': $this->_mbiterable = (bool) $value; break; case 'throw_errors': $this->_throwErrors = (bool) $value; break; } } } _prefixHandlers = $this->getDefaultHandlers(); } private function getDefaultHandlers() { return array( TextProtocol::PREFIX_STATUS => new ResponseStatusHandler(), TextProtocol::PREFIX_ERROR => new ResponseErrorHandler(), TextProtocol::PREFIX_INTEGER => new ResponseIntegerHandler(), TextProtocol::PREFIX_BULK => new ResponseBulkHandler(), TextProtocol::PREFIX_MULTI_BULK => new ResponseMultiBulkHandler(), ); } public function setHandler($prefix, IResponseHandler $handler) { $this->_prefixHandlers[$prefix] = $handler; } public function getHandler($prefix) { if (isset($this->_prefixHandlers[$prefix])) { return $this->_prefixHandlers[$prefix]; } } public function read(IConnectionComposable $connection) { $header = $connection->readLine(); if ($header === '') { $this->protocolError($connection, 'Unexpected empty header'); } $prefix = $header[0]; if (!isset($this->_prefixHandlers[$prefix])) { $this->throwMalformedResponse($connection, "Unknown prefix '$prefix'"); } $handler = $this->_prefixHandlers[$prefix]; return $handler->handle($connection, substr($header, 1)); } private function protocolError(IConnectionComposable $connection, $message) { Helpers::onCommunicationException(new ProtocolException($connection, $message)); } } checkCapabilities($client); $this->_options = $options ?: array(); $this->_client = $client; $this->_statusFlags = self::STATUS_VALID; $this->genericSubscribeInit('subscribe'); $this->genericSubscribeInit('psubscribe'); } public function __destruct() { $this->closeContext(); } private function checkCapabilities(Client $client) { if (Helpers::isCluster($client->getConnection())) { throw new ClientException( 'Cannot initialize a PUB/SUB context over a cluster of connections' ); } $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); if ($client->getProfile()->supportsCommands($commands) === false) { throw new ClientException( 'The current profile does not support PUB/SUB related commands' ); } } private function genericSubscribeInit($subscribeAction) { if (isset($this->_options[$subscribeAction])) { $this->$subscribeAction($this->_options[$subscribeAction]); } } private function isFlagSet($value) { return ($this->_statusFlags & $value) === $value; } public function subscribe(/* arguments */) { $this->writeCommand(self::SUBSCRIBE, func_get_args()); $this->_statusFlags |= self::STATUS_SUBSCRIBED; } public function unsubscribe(/* arguments */) { $this->writeCommand(self::UNSUBSCRIBE, func_get_args()); } public function psubscribe(/* arguments */) { $this->writeCommand(self::PSUBSCRIBE, func_get_args()); $this->_statusFlags |= self::STATUS_PSUBSCRIBED; } public function punsubscribe(/* arguments */) { $this->writeCommand(self::PUNSUBSCRIBE, func_get_args()); } public function closeContext() { if ($this->valid()) { if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) { $this->unsubscribe(); } if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) { $this->punsubscribe(); } } } private function writeCommand($method, $arguments) { if (count($arguments) === 1 && is_array($arguments[0])) { $arguments = $arguments[0]; } $command = $this->_client->createCommand($method, $arguments); $this->_client->getConnection()->writeCommand($command); } public function rewind() { // NOOP } public function current() { return $this->getValue(); } public function key() { return $this->_position; } public function next() { if ($this->isFlagSet(self::STATUS_VALID)) { $this->_position++; } return $this->_position; } public function valid() { $isValid = $this->isFlagSet(self::STATUS_VALID); $subscriptionFlags = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED; $hasSubscriptions = ($this->_statusFlags & $subscriptionFlags) > 0; return $isValid && $hasSubscriptions; } private function invalidate() { $this->_statusFlags = 0x0000; } private function getValue() { $response = $this->_client->getConnection()->read(); switch ($response[0]) { case self::SUBSCRIBE: case self::UNSUBSCRIBE: case self::PSUBSCRIBE: case self::PUNSUBSCRIBE: if ($response[2] === 0) { $this->invalidate(); } case self::MESSAGE: return (object) array( 'kind' => $response[0], 'channel' => $response[1], 'payload' => $response[2], ); case self::PMESSAGE: return (object) array( 'kind' => $response[0], 'pattern' => $response[1], 'channel' => $response[2], 'payload' => $response[3], ); default: throw new ClientException( "Received an unknown message type {$response[0]} inside of a pubsub context" ); } } } _message = $message; } public function __get($property) { if ($property === 'error') { return true; } if ($property === 'message') { return $this->_message; } } public function __isset($property) { return $property === 'error'; } public function __toString() { return $this->_message; } } getMessage()); } } ]]prGYA8$GBMB