Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions src/Connector/ConnectionContext.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
<?php
namespace ScriptFUSION\Porter\Connector;

use ScriptFUSION\Porter\Connector\FetchExceptionHandler\FetchExceptionHandler;
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\StatelessFetchExceptionHandler;

/**
* Specifies runtime connection settings and provides utility methods.
*/
Expand All @@ -11,20 +14,20 @@ final class ConnectionContext
/**
* User-defined exception handler called when a recoverable exception is thrown by Connector::fetch().
*
* @var callable
* @var FetchExceptionHandler
*/
private $fetchExceptionHandler;

/**
* Provider-defined exception handler called when a recoverable exception is thrown by Connector::fetch().
* Resource-defined exception handler called when a recoverable exception is thrown by Connector::fetch().
*
* @var callable
* @var FetchExceptionHandler
*/
private $providerFetchExceptionHandler;
private $resourceFetchExceptionHandler;

private $maxFetchAttempts;

public function __construct($mustCache, callable $fetchExceptionHandler, $maxFetchAttempts)
public function __construct($mustCache, FetchExceptionHandler $fetchExceptionHandler, $maxFetchAttempts)
{
$this->mustCache = (bool)$mustCache;
$this->fetchExceptionHandler = $fetchExceptionHandler;
Expand All @@ -50,39 +53,59 @@ public function mustCache()
*/
public function retry(callable $callback)
{
$userHandlerCloned = $providerHandlerCloned = false;

return \ScriptFUSION\Retry\retry(
$this->maxFetchAttempts,
$callback,
function (\Exception $exception) {
// Throw exception if unrecoverable.
function (\Exception $exception) use (&$userHandlerCloned, &$providerHandlerCloned) {
// Throw exception instead of retrying, if unrecoverable.
if (!$exception instanceof RecoverableConnectorException) {
throw $exception;
}

// Call provider's exception handler, if defined.
if ($this->providerFetchExceptionHandler) {
call_user_func($this->providerFetchExceptionHandler, $exception);
if ($this->resourceFetchExceptionHandler) {
self::invokeHandler($this->resourceFetchExceptionHandler, $exception, $providerHandlerCloned);
}

// TODO Clone exception handler to avoid persisting state between calls.
call_user_func($this->fetchExceptionHandler, $exception);
// Call user's exception handler.
self::invokeHandler($this->fetchExceptionHandler, $exception, $userHandlerCloned);
}
);
}

/**
* Invokes the specified fetch exception handler, cloning it if required.
*
* @param FetchExceptionHandler $handler Fetch exception handler.
* @param \Exception $exception Exception to pass to the handler.
* @param bool $cloned False if handler requires cloning, true if handler has already been cloned.
*/
private static function invokeHandler(FetchExceptionHandler &$handler, \Exception $exception, &$cloned)
{
if (!$cloned && !$handler instanceof StatelessFetchExceptionHandler) {
$handler = clone $handler;
$handler->initialize();
$cloned = true;
}

$handler($exception);
}

/**
* Sets an exception handler to be called when a recoverable exception is thrown by Connector::fetch().
*
* This handler is intended to be set by provider resources only and is called before the user-defined handler.
* The handler is intended to be set by provider resources only once and is called before the user-defined handler.
*
* @param callable $providerFetchExceptionHandler Exception handler.
* @param FetchExceptionHandler $resourceFetchExceptionHandler Exception handler.
*/
public function setProviderFetchExceptionHandler(callable $providerFetchExceptionHandler)
public function setResourceFetchExceptionHandler(FetchExceptionHandler $resourceFetchExceptionHandler)
{
if ($this->providerFetchExceptionHandler !== null) {
throw new \LogicException('Cannot set provider fetch exception handler: already set!');
if ($this->resourceFetchExceptionHandler !== null) {
throw new \LogicException('Cannot set resource fetch exception handler: already set!');
}

$this->providerFetchExceptionHandler = $providerFetchExceptionHandler;
$this->resourceFetchExceptionHandler = $resourceFetchExceptionHandler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php
namespace ScriptFUSION\Porter\Connector\FetchExceptionHandler;

use ScriptFUSION\Retry\ExceptionHandler\ExponentialBackoffExceptionHandler;

/**
* Sleeps for an exponentially increasing series of delays specified in microseconds.
*/
class ExponentialSleepFetchExceptionHandler implements FetchExceptionHandler
{
private $handler;

public function initialize()
{
$this->handler = new ExponentialBackoffExceptionHandler;
}

public function __invoke(\Exception $exception)
{
call_user_func($this->handler, $exception);
}
}
34 changes: 34 additions & 0 deletions src/Connector/FetchExceptionHandler/FetchExceptionHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php
namespace ScriptFUSION\Porter\Connector\FetchExceptionHandler;

/**
* Provides methods for handling exceptions thrown by Connector::fetch().
*
* This interface supports a prototype cloning model that guarantees the object can be cloned and reset to its
* initial state at any time, any number of times. This is needed because a given import can spawn any number of
* subsequent fetches, some of which may execute concurrently, and all of which share the same exception handler
* prototype.
*
* This approach is better than relying on __clone because handlers may employ generators which cannot be cloned.
* If generators are part of the object's state they must be recreated during initialize().
*/
interface FetchExceptionHandler
{
/**
* Initializes this handler to its starting state. Should be idempotent because it may be called multiple times.
*
* This method must always be called before the first call to __invoke.
*
* @return void
*/
public function initialize();

/**
* Handles a fetch() exception.
*
* @param \Exception $exception Exception thrown by Connector::fetch().
*
* @return void
*/
public function __invoke(\Exception $exception);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php
namespace ScriptFUSION\Porter\Connector\FetchExceptionHandler;

/**
* Contains a fetch exception handler that does not have private state and therefore does not require initialization.
*/
final class StatelessFetchExceptionHandler implements FetchExceptionHandler
{
private $handler;

public function __construct(callable $handler)
{
$this->handler = $handler;
}

public function initialize()
{
// Intentionally empty.
}

public function __invoke(\Exception $exception)
{
call_user_func($this->handler, $exception);
}
}
7 changes: 4 additions & 3 deletions src/Connector/ImportConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace ScriptFUSION\Porter\Connector;

use ScriptFUSION\Porter\Cache\CacheUnavailableException;
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\FetchExceptionHandler;

/**
* Connector whose lifecycle is synchronised with an import operation. Ensures correct ConnectionContext is delivered
Expand Down Expand Up @@ -43,10 +44,10 @@ public function getWrappedConnector()
/**
* Sets the exception handler to be called when a recoverable exception is thrown by Connector::fetch().
*
* @param callable $exceptionHandler Exception handler.
* @param FetchExceptionHandler $exceptionHandler Fetch exception handler.
*/
public function setExceptionHandler(callable $exceptionHandler)
public function setExceptionHandler(FetchExceptionHandler $exceptionHandler)
{
$this->context->setProviderFetchExceptionHandler($exceptionHandler);
$this->context->setResourceFetchExceptionHandler($exceptionHandler);
}
}
15 changes: 8 additions & 7 deletions src/Specification/ImportSpecification.php
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<?php
namespace ScriptFUSION\Porter\Specification;

use ScriptFUSION\Porter\Connector\FetchExceptionHandler\ExponentialSleepFetchExceptionHandler;
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\FetchExceptionHandler;
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;
use ScriptFUSION\Porter\Transform\Transformer;
use ScriptFUSION\Retry\ExceptionHandler\ExponentialBackoffExceptionHandler;

/**
* Specifies which resource to import and how the data should be transformed.
Expand Down Expand Up @@ -43,7 +44,7 @@ class ImportSpecification
private $maxFetchAttempts = self::DEFAULT_FETCH_ATTEMPTS;

/**
* @var callable
* @var FetchExceptionHandler
*/
private $fetchExceptionHandler;

Expand All @@ -67,7 +68,7 @@ function (Transformer $transformer) {
));

is_object($this->context) && $this->context = clone $this->context;
is_object($this->fetchExceptionHandler) && $this->fetchExceptionHandler = clone $this->fetchExceptionHandler;
$this->fetchExceptionHandler && $this->fetchExceptionHandler = clone $this->fetchExceptionHandler;
}

/**
Expand Down Expand Up @@ -237,21 +238,21 @@ final public function setMaxFetchAttempts($attempts)
/**
* Gets the exception handler invoked each time a fetch attempt fails.
*
* @return callable Exception handler.
* @return FetchExceptionHandler Fetch exception handler.
*/
final public function getFetchExceptionHandler()
{
return $this->fetchExceptionHandler ?: $this->fetchExceptionHandler = new ExponentialBackoffExceptionHandler;
return $this->fetchExceptionHandler ?: $this->fetchExceptionHandler = new ExponentialSleepFetchExceptionHandler;
}

/**
* Sets the exception handler invoked each time a fetch attempt fails.
*
* @param callable $fetchExceptionHandler Exception handler.
* @param FetchExceptionHandler $fetchExceptionHandler Fetch exception handler.
*
* @return $this
*/
final public function setFetchExceptionHandler(callable $fetchExceptionHandler)
final public function setFetchExceptionHandler(FetchExceptionHandler $fetchExceptionHandler)
{
$this->fetchExceptionHandler = $fetchExceptionHandler;

Expand Down
5 changes: 3 additions & 2 deletions test/FixtureFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace ScriptFUSIONTest;

use ScriptFUSION\Porter\Connector\ConnectionContext;
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\StatelessFetchExceptionHandler;
use ScriptFUSION\Porter\Specification\ImportSpecification;
use ScriptFUSION\StaticClass;

Expand All @@ -25,9 +26,9 @@ public static function buildConnectionContext(
) {
return new ConnectionContext(
$cacheAdvice,
$fetchExceptionHandler ?: function () {
$fetchExceptionHandler ?: new StatelessFetchExceptionHandler(static function () {
// Intentionally empty.
},
}),
$maxFetchAttempts
);
}
Expand Down
89 changes: 89 additions & 0 deletions test/Integration/Porter/Connector/ConnectionContextTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php
namespace ScriptFUSIONTest\Integration\Porter\Connector;

use ScriptFUSION\Porter\Connector\ConnectionContext;
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\StatelessFetchExceptionHandler;
use ScriptFUSION\Porter\Connector\RecoverableConnectorException;
use ScriptFUSIONTest\FixtureFactory;
use ScriptFUSIONTest\Stubs\TestFetchExceptionHandler;

/**
* @see ConnectionContext
*/
final class ConnectionContextTest extends \PHPUnit_Framework_TestCase
{
/**
* Tests that when retry() is called multiple times, the original fetch exception handler is unmodified.
* This is expected because the handler must be cloned using the prototype pattern to ensure multiple concurrent
* fetches do not conflict.
*
* @dataProvider provideHandlerAndContext
*/
public function testFetchExceptionHandlerCloned(TestFetchExceptionHandler $handler, ConnectionContext $context)
{
$handler->initialize();
$initial = $handler->getCurrent();

$context->retry(self::createExceptionThrowingClosure());

self::assertSame($initial, $handler->getCurrent());
}

public function provideHandlerAndContext()
{
yield 'User exception handler' => [
$handler = new TestFetchExceptionHandler,
FixtureFactory::buildConnectionContext(false, $handler),
];

$context = FixtureFactory::buildConnectionContext();
// It should be OK to reuse the handler here because the whole point of the test is that it's not modified.
$context->setResourceFetchExceptionHandler($handler);
yield 'Resource exception handler' => [$handler, $context];
}

/**
* Tests that when retry() is called, a stateless fetch exception handler is neither cloned nor reinitialized.
* For stateless handlers, initialization is a NOOP, so avoiding cloning is a small optimization.
*/
public function testStatelessExceptionHandlerNotCloned()
{
$context = FixtureFactory::buildConnectionContext(
false,
$handler = new StatelessFetchExceptionHandler(static function () {
// Intentionally empty.
})
);

$context->retry(self::createExceptionThrowingClosure());

self::assertSame(
$handler,
call_user_func(
\Closure::bind(
function () {
return $this->fetchExceptionHandler;
},
$context,
$context
)
)
);
}

/**
* Creates a closure that only throws an exception on the first invocation.
*
* @return \Closure
*/
private static function createExceptionThrowingClosure()
{
return static function () {
static $invocationCount;

if (!$invocationCount++) {
throw new RecoverableConnectorException;
}
};
}
}
Loading