* @copyright 2011 Vasil Rangelov * @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1 * @version 1.0.0a5 * @link http://pear2.php.net/PEAR2_Net_Transmitter */ /** * The namespace declaration. */ namespace PEAR2\Net\Transmitter; use Exception as E; /** * A stream transmitter. * * This is a convinience wrapper for stream functionality. Used to ensure data * integrity. Designed for TCP sockets, but it has intentionally been made to * accept any stream. * * @category Net * @package PEAR2_Net_Transmitter * @author Vasil Rangelov * @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1 * @link http://pear2.php.net/PEAR2_Net_Transmitter */ class Stream { /** * Used to stop settings in either direction being applied. */ const DIRECTION_NONE = 0; /** * Used to apply settings only to receiving. */ const DIRECTION_RECEIVE = 1; /** * Used to apply settings only to sending. */ const DIRECTION_SEND = 2; /** * Used to apply settings to both sending and receiving. */ const DIRECTION_ALL = 3; /** * @var resource The stream to wrap around. */ protected $stream; /** * @var bool Whether to automaticaly close the stream on * object destruction if it's not a persistent one. Setting this to * FALSE may be useful if you're only using this class "part time", * while setting it to TRUE might be useful if you're doing some * "on offs". */ protected $autoClose = false; /** * @var bool A flag that tells whether or not the stream is persistent. */ protected $persist; /** * @var bool Whether the wrapped stream is in blocking mode or not. */ protected $isBlocking = true; /** * @var array An associative array with the chunk size of each direction. * Key is the direction, value is the size in bytes as integer. */ protected $chunkSize = array( self::DIRECTION_SEND => 0xFFFFF, self::DIRECTION_RECEIVE => 0xFFFFF ); /** * Wraps around the specified stream. * * @param resource $stream The stream to wrap around. * @param bool $autoClose Whether to automaticaly close the stream on * object destruction if it's not a persistent one. Setting this to * FALSE may be useful if you're only using this class "part time", * while setting it to TRUE might be useful if you're doing some * "on offs". * * @see static::isFresh() */ public function __construct($stream, $autoClose = false) { if (!self::isStream($stream)) { throw $this->createException('Invalid stream supplied.', 1); } $this->stream = $stream; $this->autoClose = (bool) $autoClose; $this->persist = (bool) preg_match( '#\s?persistent\s?#sm', get_resource_type($stream) ); $meta = stream_get_meta_data($stream); $this->isBlocking = isset($meta['blocked']) ? $meta['blocked'] : true; } /** * PHP error handler for connection errors. * * @param string $level Level of PHP error raised. Ignored. * @param string $message Message raised by PHP. * * @return void * @throws SocketException That's how the error is handled. * @SuppressWarnings(PHPMD.UnusedFormalParameter) */ protected function handleError($level, $message) { throw $this->createException($message, 0); } /** * Checks if a given variable is a stream resource. * * @param mixed $var The variable to check. * * @return bool TRUE on success, FALSE on failure. */ public static function isStream($var) { return is_resource($var) && (bool) preg_match('#\s?stream$#sm', get_resource_type($var)); } /** * Checks whether the wrapped stream is fresh. * * Checks whether the wrapped stream is fresh. A stream is considered fresh * if there hasn't been any activity on it. Particularly useful for * detecting reused persistent connections. * * @return bool TRUE if the socket is fresh, FALSE otherwise. */ public function isFresh() { return ftell($this->stream) === 0; } /** * Checks whether the wrapped stream is a persistent one. * * @return bool TRUE if the stream is a persistent one, FALSE otherwise. */ public function isPersistent() { return $this->persist; } /** * Checks whether the wrapped stream is a blocking one. * * @return bool TRUE if the stream is a blocking one, FALSE otherwise. */ public function isBlocking() { return $this->isBlocking; } /** * Sets blocking mode. * * @param bool $block Sets whether the stream is in blocking mode. * * @return bool TRUE on success, FALSE on failure. */ public function setIsBlocking($block) { $block = (bool)$block; if (stream_set_blocking($this->stream, (int)$block)) { $this->isBlocking = $block; return true; } return false; } /** * Sets the timeout for the stream. * * @param int $seconds Timeout in seconds. * @param int $microseconds Timeout in microseconds to be added to the * seconds. * * @return bool TRUE on success, FALSE on failure. */ public function setTimeout($seconds, $microseconds = 0) { return stream_set_timeout($this->stream, $seconds, $microseconds); } /** * Sets the size of a stream's buffer. * * @param int $size The desired size of the buffer, in bytes. * @param string $direction The buffer of which direction to set. Valid * values are the DIRECTION_* constants. * * @return bool TRUE on success, FALSE on failure. */ public function setBuffer($size, $direction = self::DIRECTION_ALL) { switch($direction) { case self::DIRECTION_SEND: return stream_set_write_buffer($this->stream, $size) === 0; case self::DIRECTION_RECEIVE: return stream_set_read_buffer($this->stream, $size) === 0; case self::DIRECTION_ALL: return $this->setBuffer($size, self::DIRECTION_RECEIVE) && $this->setBuffer($size, self::DIRECTION_SEND); } return false; } /** * Sets the size of the chunk. * * To ensure data integrity, as well as to allow for lower memory * consumption, data is sent/received in chunks. This function * allows you to set the size of each chunk. The default is 0xFFFFF. * * @param int $size The desired size of the chunk, in bytes. * @param string $direction The chunk of which direction to set. Valid * values are the DIRECTION_* constants. * * @return bool TRUE on success, FALSE on failure. */ public function setChunk($size, $direction = self::DIRECTION_ALL) { $size = (int) $size; if ($size <= 0) { return false; } switch($direction) { case self::DIRECTION_SEND: case self::DIRECTION_RECEIVE: $this->chunkSize[$direction] = $size; return true; case self::DIRECTION_ALL: $this->chunkSize[self::DIRECTION_SEND] = $this->chunkSize[self::DIRECTION_RECEIVE] = $size; return true; } return false; } /** * Gets the size of the chunk. * * @param string $direction The chunk of which direction to get. Valid * values are the DIRECTION_* constants. * * @return int|array|false The chunk size in bytes, * or an array of chunk sizes with the directions as keys. * FALSE on invalid direction. */ public function getChunk($direction = self::DIRECTION_ALL) { switch($direction) { case self::DIRECTION_SEND: case self::DIRECTION_RECEIVE: return $this->chunkSize[$direction]; case self::DIRECTION_ALL: return $this->chunkSize; } return false; } /** * Sends a string or stream over the wrapped stream. * * Sends a string or stream over the wrapped stream. If a seekable stream is * provided, it will be seeked back to the same position it was passed as, * regardless of the $offset parameter. * * @param string|resource $contents The string or stream to send. * @param int $offset The offset from which to start sending. * If a stream is provided, and this is set to NULL, sending will start * from the current stream position. * @param int $length The maximum length to send. If omitted, * the string/stream will be sent to its end. * * @return int The number of bytes sent. */ public function send($contents, $offset = null, $length = null) { $bytes = 0; $chunkSize = $this->chunkSize[self::DIRECTION_SEND]; $lengthIsNotNull = null !== $length; $offsetIsNotNull = null !== $offset; if (self::isStream($contents)) { if ($offsetIsNotNull) { $oldPos = ftell($contents); fseek($contents, $offset, SEEK_SET); } while (!feof($contents)) { if ($lengthIsNotNull && 0 === $chunkSize = min($chunkSize, $length - $bytes) ) { break; } $bytesNow = @fwrite( $this->stream, fread($contents, $chunkSize) ); if (0 != $bytesNow) { $bytes += $bytesNow; } elseif ($this->isBlocking || false === $bytesNow) { throw $this->createException( 'Failed while sending stream.', 2, null, $bytes ); } else { usleep(300000); } $this->isAcceptingData(null); } if ($offsetIsNotNull) { fseek($contents, $oldPos, SEEK_SET); } else { fseek($contents, -$bytes, SEEK_CUR); } } else { $contents = (string) $contents; if ($offsetIsNotNull) { $contents = substr($contents, $offset); } if ($lengthIsNotNull) { $contents = substr($contents, 0, $length); } $bytesToSend = (double) sprintf('%u', strlen($contents)); while ($bytes < $bytesToSend) { $bytesNow = @fwrite( $this->stream, substr($contents, $bytes, $chunkSize) ); if (0 != $bytesNow) { $bytes += $bytesNow; } elseif ($this->isBlocking || false === $bytesNow) { throw $this->createException( 'Failed while sending string.', 3, null, $bytes ); } else { usleep(300000); } $this->isAcceptingData(null); } } return $bytes; } /** * Reads from the wrapped stream to receive. * * Reads from the wrapped stream to receive content as a string. * * @param int $length The number of bytes to receive. * @param string $what Descriptive string about what is being received * (used in exception messages). * * @return string The received content. */ public function receive($length, $what = 'data') { $result = ''; $chunkSize = $this->chunkSize[self::DIRECTION_RECEIVE]; while ($length > 0) { while ($this->isAvailable()) { $fragment = fread($this->stream, min($length, $chunkSize)); if ('' != $fragment) { $length -= strlen($fragment); $result .= $fragment; continue 2; } elseif (!$this->isBlocking && !(false === $fragment)) { usleep(3000); continue 2; } } throw $this->createException( "Failed while receiving {$what}", 4, null, $result ); } return $result; } /** * Reads from the wrapped stream to receive. * * Reads from the wrapped stream to receive content as a stream. * * @param int $length The number of bytes to receive. * @param FilterCollection $filters A collection of filters to apply to the * stream while receiving. Note that the filters will not be present on * the stream after receiving is done. * @param string $what Descriptive string about what is being * received (used in exception messages). * * @return resource The received content. */ public function receiveStream( $length, FilterCollection $filters = null, $what = 'stream data' ) { $result = fopen('php://temp', 'r+b'); $appliedFilters = array(); if (null !== $filters) { foreach ($filters as $filtername => $params) { $appliedFilters[] = stream_filter_append( $result, $filtername, STREAM_FILTER_WRITE, $params ); } } $chunkSize = $this->chunkSize[self::DIRECTION_RECEIVE]; while ($length > 0) { while ($this->isAvailable()) { $fragment = fread($this->stream, min($length, $chunkSize)); if ('' != $fragment) { $length -= strlen($fragment); fwrite($result, $fragment); continue 2; } elseif (!$this->isBlocking && !(false === $fragment)) { usleep(3000); continue 2; } } foreach ($appliedFilters as $filter) { stream_filter_remove($filter); } rewind($result); throw $this->createException( "Failed while receiving {$what}", 5, null, $result ); } foreach ($appliedFilters as $filter) { stream_filter_remove($filter); } rewind($result); return $result; } /** * Checks whether the stream is available for operations. * * For network streams, this means whether the other end has closed the * connection. * * @return bool TRUE if the stream is available, FALSE otherwise. */ public function isAvailable() { return self::isStream($this->stream) && !feof($this->stream); } /** * Checks whether there is data to be read from the wrapped stream. * * @param int|null $sTimeout If theere isn't data awaiting currently, * wait for it this many seconds for data to arrive. If NULL is * specified, wait indefinetly for that. * @param int $usTimeout Microseconds to add to the waiting time. * * @return bool TRUE if there is data to be read, FALSE otherwise. * @SuppressWarnings(PHPMD.ShortVariable) */ public function isDataAwaiting($sTimeout = 0, $usTimeout = 0) { if (self::isStream($this->stream)) { if (null === $sTimeout && !$this->isBlocking) { $meta = stream_get_meta_data($this->stream); return !$meta['eof']; } $w = $e = null; $r = array($this->stream); return 1 === @/* due to PHP bug #54563 */stream_select( $r, $w, $e, $sTimeout, $usTimeout ); } return false; } /** * Checks whether the wrapped stream can be written to without a block. * * @param int|null $sTimeout If the stream isn't currently accepting data, * wait for it this many seconds to start accepting data. If NULL is * specified, wait indefinetly for that. * @param int $usTimeout Microseconds to add to the waiting time. * * @return bool TRUE if the wrapped stream would not block on a write, FALSE * otherwise. * @SuppressWarnings(PHPMD.ShortVariable) */ public function isAcceptingData($sTimeout = 0, $usTimeout = 0) { if (self::isStream($this->stream)) { if (!$this->isBlocking) { $meta = stream_get_meta_data($this->stream); return !$meta['eof']; } elseif (feof($this->stream)) { return false; } $r = $e = null; $w = array($this->stream); return 1 === @/* due to PHP bug #54563 */stream_select( $r, $w, $e, $sTimeout, $usTimeout ); } return false; } /** * Closes the opened stream, unless it's a persistent one. */ public function __destruct() { if ((!$this->persist) && $this->autoClose) { $this->close(); } } /** * Closes the opened stream, even if it is a persistent one. * * @return bool TRUE on success, FALSE on failure. */ public function close() { return self::isStream($this->stream) && fclose($this->stream); } /** * Creates a new exception. * * Creates a new exception. Used by the rest of the functions in this class. * Override in derived classes for custom exception handling. * * @param string $message The exception message. * @param int $code The exception code. * @param E|null $previous Previous exception thrown, * or NULL if there is none. * @param int|string|resource|null $fragment The fragment up until the * point of failure. * On failure with sending, this is the number of bytes sent * successfully before the failure. * On failure when receiving, this is a string/stream holding * the contents received successfully before the failure. * * @return StreamException The exception to then be thrown. */ protected function createException( $message, $code = 0, E $previous = null, $fragment = null ) { return new StreamException($message, $code, $previous, $fragment); } }