mirror of
https://github.com/the-djmaze/snappymail.git
synced 2025-11-13 16:53:12 +08:00
337 lines
8.8 KiB
PHP
337 lines
8.8 KiB
PHP
<?php
|
|
|
|
/*
|
|
* This file is part of the Predis package.
|
|
*
|
|
* (c) 2009-2020 Daniele Alessandri
|
|
* (c) 2021-2023 Till Krüss
|
|
*
|
|
* For the full copyright and license information, please view the LICENSE
|
|
* file that was distributed with this source code.
|
|
*/
|
|
|
|
namespace Predis\Connection;
|
|
|
|
use InvalidArgumentException;
|
|
use Predis\ClientException;
|
|
use Predis\Command\CommandInterface;
|
|
use Predis\NotSupportedException;
|
|
use Predis\Response\ServerException;
|
|
use Relay\Exception as RelayException;
|
|
use Relay\Relay;
|
|
|
|
/**
|
|
* This class provides the implementation of a Predis connection that
|
|
* uses Relay for network communication and in-memory caching.
|
|
*
|
|
* Using Relay allows for:
|
|
* 1) significantly faster reads thanks to in-memory caching
|
|
* 2) fast data serialization using igbinary
|
|
* 3) fast data compression using lzf, lz4 or zstd
|
|
*
|
|
* Usage of igbinary serialization and zstd compresses reduces
|
|
* network traffic and Redis memory usage by ~75%.
|
|
*
|
|
* For instructions on how to install the Relay extension, please consult
|
|
* the repository of the project: https://relay.so/docs/installation
|
|
*
|
|
* The connection parameters supported by this class are:
|
|
*
|
|
* - scheme: it can be either 'tcp', 'tls' or 'unix'.
|
|
* - host: hostname or IP address of the server.
|
|
* - port: TCP port of the server.
|
|
* - path: path of a UNIX domain socket when scheme is 'unix'.
|
|
* - timeout: timeout to perform the connection.
|
|
* - read_write_timeout: timeout of read / write operations.
|
|
* - cache: whether to use in-memory caching
|
|
* - serializer: data serializer
|
|
* - compression: data compression algorithm
|
|
*
|
|
* @see https://github.com/cachewerk/relay
|
|
*/
|
|
class RelayConnection extends StreamConnection
|
|
{
|
|
use RelayMethods;
|
|
|
|
/**
|
|
* The Relay instance.
|
|
*
|
|
* @var \Relay\Relay
|
|
*/
|
|
protected $client;
|
|
|
|
/**
|
|
* These commands must be called on the client, not using `Relay::rawCommand()`.
|
|
*
|
|
* @var string[]
|
|
*/
|
|
public $atypicalCommands = [
|
|
'AUTH',
|
|
'SELECT',
|
|
|
|
'TYPE',
|
|
|
|
'MULTI',
|
|
'EXEC',
|
|
'DISCARD',
|
|
|
|
'WATCH',
|
|
'UNWATCH',
|
|
|
|
'SUBSCRIBE',
|
|
'UNSUBSCRIBE',
|
|
'PSUBSCRIBE',
|
|
'PUNSUBSCRIBE',
|
|
'SSUBSCRIBE',
|
|
'SUNSUBSCRIBE',
|
|
];
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function __construct(ParametersInterface $parameters)
|
|
{
|
|
$this->assertExtensions();
|
|
|
|
$this->parameters = $this->assertParameters($parameters);
|
|
$this->client = $this->createClient();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function isConnected()
|
|
{
|
|
return $this->client->isConnected();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function disconnect()
|
|
{
|
|
if ($this->client->isConnected()) {
|
|
$this->client->close();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Checks if the Relay extension is loaded in PHP.
|
|
*/
|
|
private function assertExtensions()
|
|
{
|
|
if (!extension_loaded('relay')) {
|
|
throw new NotSupportedException(
|
|
'The "relay" extension is required by this connection backend.'
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
protected function assertParameters(ParametersInterface $parameters)
|
|
{
|
|
if (!in_array($parameters->scheme, ['tcp', 'tls', 'unix', 'redis', 'rediss'])) {
|
|
throw new InvalidArgumentException("Invalid scheme: '{$parameters->scheme}'.");
|
|
}
|
|
|
|
if (!in_array($parameters->serializer, [null, 'php', 'igbinary', 'msgpack', 'json'])) {
|
|
throw new InvalidArgumentException("Invalid serializer: '{$parameters->serializer}'.");
|
|
}
|
|
|
|
if (!in_array($parameters->compression, [null, 'lzf', 'lz4', 'zstd'])) {
|
|
throw new InvalidArgumentException("Invalid compression algorithm: '{$parameters->compression}'.");
|
|
}
|
|
|
|
return $parameters;
|
|
}
|
|
|
|
/**
|
|
* Creates a new instance of the client.
|
|
*
|
|
* @return \Relay\Relay
|
|
*/
|
|
private function createClient()
|
|
{
|
|
$client = new Relay();
|
|
|
|
// throw when errors occur and return `null` for non-existent keys
|
|
$client->setOption(Relay::OPT_PHPREDIS_COMPATIBILITY, false);
|
|
|
|
// use reply literals
|
|
$client->setOption(Relay::OPT_REPLY_LITERAL, true);
|
|
|
|
// disable Relay's command/connection retry
|
|
$client->setOption(Relay::OPT_MAX_RETRIES, 0);
|
|
|
|
// whether to use in-memory caching
|
|
$client->setOption(Relay::OPT_USE_CACHE, $this->parameters->cache ?? true);
|
|
|
|
// set data serializer
|
|
$client->setOption(Relay::OPT_SERIALIZER, constant(sprintf(
|
|
'%s::SERIALIZER_%s',
|
|
Relay::class,
|
|
strtoupper($this->parameters->serializer ?? 'none')
|
|
)));
|
|
|
|
// set data compression algorithm
|
|
$client->setOption(Relay::OPT_COMPRESSION, constant(sprintf(
|
|
'%s::COMPRESSION_%s',
|
|
Relay::class,
|
|
strtoupper($this->parameters->compression ?? 'none')
|
|
)));
|
|
|
|
return $client;
|
|
}
|
|
|
|
/**
|
|
* Returns the underlying client.
|
|
*
|
|
* @return \Relay\Relay
|
|
*/
|
|
public function getClient()
|
|
{
|
|
return $this->client;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
protected function getIdentifier()
|
|
{
|
|
return $this->client->endpointId();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
protected function createStreamSocket(ParametersInterface $parameters, $address, $flags)
|
|
{
|
|
$timeout = isset($parameters->timeout) ? (float) $parameters->timeout : 5.0;
|
|
|
|
$retry_interval = 0;
|
|
$read_timeout = 5.0;
|
|
|
|
if (isset($parameters->read_write_timeout)) {
|
|
$read_timeout = (float) $parameters->read_write_timeout;
|
|
$read_timeout = $read_timeout > 0 ? $read_timeout : 0;
|
|
}
|
|
|
|
try {
|
|
$this->client->connect(
|
|
$parameters->path ?? $parameters->host,
|
|
isset($parameters->path) ? 0 : $parameters->port,
|
|
$timeout,
|
|
null,
|
|
$retry_interval,
|
|
$read_timeout
|
|
);
|
|
} catch (RelayException $ex) {
|
|
$this->onConnectionError($ex->getMessage(), $ex->getCode());
|
|
}
|
|
|
|
return $this->client;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function executeCommand(CommandInterface $command)
|
|
{
|
|
if (!$this->client->isConnected()) {
|
|
$this->getResource();
|
|
}
|
|
|
|
try {
|
|
$name = $command->getId();
|
|
|
|
// When using compression or a serializer, we'll need a dedicated
|
|
// handler for `Predis\Command\RawCommand` calls, currently both
|
|
// parameters are unsupported until a future Relay release
|
|
return in_array($name, $this->atypicalCommands)
|
|
? $this->client->{$name}(...$command->getArguments())
|
|
: $this->client->rawCommand($name, ...$command->getArguments());
|
|
} catch (RelayException $ex) {
|
|
throw $this->onCommandError($ex, $command);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function onCommandError(RelayException $exception, CommandInterface $command)
|
|
{
|
|
$code = $exception->getCode();
|
|
$message = $exception->getMessage();
|
|
|
|
if (strpos($message, 'RELAY_ERR_IO')) {
|
|
return new ConnectionException($this, $message, $code, $exception);
|
|
}
|
|
|
|
if (strpos($message, 'RELAY_ERR_REDIS')) {
|
|
return new ServerException($message, $code, $exception);
|
|
}
|
|
|
|
if (strpos($message, 'RELAY_ERR_WRONGTYPE') && strpos($message, "Got reply-type 'status'")) {
|
|
$message = 'Operation against a key holding the wrong kind of value';
|
|
}
|
|
|
|
return new ClientException($message, $code, $exception);
|
|
}
|
|
|
|
/**
|
|
* Applies the configured serializer and compression to given value.
|
|
*
|
|
* @param mixed $value
|
|
* @return string
|
|
*/
|
|
public function pack($value)
|
|
{
|
|
return $this->client->_pack($value);
|
|
}
|
|
|
|
/**
|
|
* Deserializes and decompresses to given value.
|
|
*
|
|
* @param mixed $value
|
|
* @return string
|
|
*/
|
|
public function unpack($value)
|
|
{
|
|
return $this->client->_unpack($value);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function writeRequest(CommandInterface $command)
|
|
{
|
|
throw new NotSupportedException('The "relay" extension does not support writing requests.');
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function readResponse(CommandInterface $command)
|
|
{
|
|
throw new NotSupportedException('The "relay" extension does not support reading responses.');
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function __destruct()
|
|
{
|
|
$this->disconnect();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function __wakeup()
|
|
{
|
|
$this->assertExtensions();
|
|
$this->client = $this->createClient();
|
|
}
|
|
}
|