|
|
@@ -57,12 +57,14 @@ class Connection implements ConnectionInterface
|
|
|
protected $_config;
|
|
|
|
|
|
/**
|
|
|
- * Driver object, responsible for creating the real connection
|
|
|
- * and provide specific SQL dialect.
|
|
|
- *
|
|
|
* @var \Cake\Database\DriverInterface
|
|
|
*/
|
|
|
- protected $_driver;
|
|
|
+ protected DriverInterface $readDriver;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @var \Cake\Database\DriverInterface
|
|
|
+ */
|
|
|
+ protected DriverInterface $writeDriver;
|
|
|
|
|
|
/**
|
|
|
* Contains how many nested transactions have been started.
|
|
|
@@ -139,19 +141,61 @@ class Connection implements ConnectionInterface
|
|
|
public function __construct(array $config)
|
|
|
{
|
|
|
$this->_config = $config;
|
|
|
+ [self::ROLE_READ => $this->readDriver, self::ROLE_WRITE => $this->writeDriver] = $this->createDrivers($config);
|
|
|
+
|
|
|
+ if (!empty($config['log'])) {
|
|
|
+ $this->enableQueryLogging((bool)$config['log']);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates read and write drivers.
|
|
|
+ *
|
|
|
+ * @param array $config Connection config
|
|
|
+ * @return array<string, \Cake\Database\DriverInterface>
|
|
|
+ * @psalm-return array{read: \Cake\Database\DriverInterface, write: \Cake\Database\DriverInterface}
|
|
|
+ */
|
|
|
+ protected function createDrivers(array $config): array
|
|
|
+ {
|
|
|
+ $driver = $config['driver'] ?? '';
|
|
|
+ if (!is_string($driver)) {
|
|
|
+ /** @var \Cake\Database\DriverInterface $driver */
|
|
|
+ if (!$driver->enabled()) {
|
|
|
+ throw new MissingExtensionException(['driver' => get_class($driver), 'name' => $this->configName()]);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Legacy support for setting instance instead of driver class
|
|
|
+ return [self::ROLE_READ => $driver, self::ROLE_WRITE => $driver];
|
|
|
+ }
|
|
|
+
|
|
|
+ /** @var class-string<\Cake\Database\DriverInterface>|null $driverClass */
|
|
|
+ $driverClass = App::className($driver, 'Database/Driver');
|
|
|
+ if ($driverClass === null) {
|
|
|
+ throw new MissingDriverException(['driver' => $driver, 'connection' => $this->configName()]);
|
|
|
+ }
|
|
|
|
|
|
- $driverConfig = array_diff_key($config, array_flip([
|
|
|
+ $sharedConfig = array_diff_key($config, array_flip([
|
|
|
'name',
|
|
|
'driver',
|
|
|
'log',
|
|
|
'cacheMetaData',
|
|
|
'cacheKeyPrefix',
|
|
|
]));
|
|
|
- $this->_driver = $this->createDriver($config['driver'] ?? '', $driverConfig);
|
|
|
|
|
|
- if (!empty($config['log'])) {
|
|
|
- $this->enableQueryLogging((bool)$config['log']);
|
|
|
+ $writeConfig = $config['write'] ?? [] + $sharedConfig;
|
|
|
+ $readConfig = $config['read'] ?? [] + $sharedConfig;
|
|
|
+ if ($readConfig == $writeConfig) {
|
|
|
+ $readDriver = $writeDriver = new $driverClass(['_role' => self::ROLE_WRITE] + $writeConfig);
|
|
|
+ } else {
|
|
|
+ $readDriver = new $driverClass(['_role' => self::ROLE_READ] + $readConfig);
|
|
|
+ $writeDriver = new $driverClass(['_role' => self::ROLE_WRITE] + $writeConfig);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!$writeDriver->enabled()) {
|
|
|
+ throw new MissingExtensionException(['driver' => get_class($writeDriver), 'name' => $this->configName()]);
|
|
|
}
|
|
|
+
|
|
|
+ return [self::ROLE_READ => $readDriver, self::ROLE_WRITE => $writeDriver];
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -207,7 +251,8 @@ class Connection implements ConnectionInterface
|
|
|
{
|
|
|
deprecationWarning('Setting the driver is deprecated. Use the connection config instead.');
|
|
|
|
|
|
- $this->_driver = $this->createDriver($driver, $config);
|
|
|
+ $driver = $this->createDriver($driver, $config);
|
|
|
+ $this->readDriver = $this->writeDriver = $driver;
|
|
|
|
|
|
return $this;
|
|
|
}
|
|
|
@@ -230,7 +275,7 @@ class Connection implements ConnectionInterface
|
|
|
if ($className === null) {
|
|
|
throw new MissingDriverException(['driver' => $driver, 'connection' => $this->configName()]);
|
|
|
}
|
|
|
- $driver = new $className($config);
|
|
|
+ $driver = new $className(['_role' => self::ROLE_WRITE] + $config);
|
|
|
}
|
|
|
|
|
|
if (!$driver->enabled()) {
|
|
|
@@ -254,11 +299,14 @@ class Connection implements ConnectionInterface
|
|
|
/**
|
|
|
* Gets the driver instance.
|
|
|
*
|
|
|
+ * @param string $role Connection role ('read' or 'write')
|
|
|
* @return \Cake\Database\DriverInterface
|
|
|
*/
|
|
|
- public function getDriver(): DriverInterface
|
|
|
+ public function getDriver(string $role = self::ROLE_WRITE): DriverInterface
|
|
|
{
|
|
|
- return $this->_driver;
|
|
|
+ assert($role === self::ROLE_READ || $role === self::ROLE_WRITE);
|
|
|
+
|
|
|
+ return $role === self::ROLE_READ ? $this->readDriver : $this->writeDriver;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -269,20 +317,25 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function connect(): bool
|
|
|
{
|
|
|
- try {
|
|
|
- return $this->_driver->connect();
|
|
|
- } catch (MissingConnectionException $e) {
|
|
|
- throw $e;
|
|
|
- } catch (Throwable $e) {
|
|
|
- throw new MissingConnectionException(
|
|
|
- [
|
|
|
- 'driver' => App::shortName(get_class($this->_driver), 'Database/Driver'),
|
|
|
- 'reason' => $e->getMessage(),
|
|
|
- ],
|
|
|
- null,
|
|
|
- $e
|
|
|
- );
|
|
|
+ $connected = true;
|
|
|
+ foreach ([self::ROLE_READ, self::ROLE_WRITE] as $role) {
|
|
|
+ try {
|
|
|
+ $connected = $connected && $this->getDriver($role)->connect();
|
|
|
+ } catch (MissingConnectionException $e) {
|
|
|
+ throw $e;
|
|
|
+ } catch (Throwable $e) {
|
|
|
+ throw new MissingConnectionException(
|
|
|
+ [
|
|
|
+ 'driver' => App::shortName(get_class($this->getDriver($role)), 'Database/Driver'),
|
|
|
+ 'reason' => $e->getMessage(),
|
|
|
+ ],
|
|
|
+ null,
|
|
|
+ $e
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ return $connected;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -292,7 +345,8 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function disconnect(): void
|
|
|
{
|
|
|
- $this->_driver->disconnect();
|
|
|
+ $this->getDriver(self::ROLE_READ)->disconnect();
|
|
|
+ $this->getDriver(self::ROLE_WRITE)->disconnect();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -302,7 +356,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function isConnected(): bool
|
|
|
{
|
|
|
- return $this->_driver->isConnected();
|
|
|
+ return $this->getDriver(self::ROLE_READ)->isConnected() && $this->getDriver(self::ROLE_WRITE)->isConnected();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -313,8 +367,10 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function prepare($query): StatementInterface
|
|
|
{
|
|
|
- return $this->getDisconnectRetry()->run(function () use ($query) {
|
|
|
- $statement = $this->_driver->prepare($query);
|
|
|
+ $role = $query instanceof Query ? $query->getConnectionRole() : self::ROLE_WRITE;
|
|
|
+
|
|
|
+ return $this->getDisconnectRetry()->run(function () use ($query, $role) {
|
|
|
+ $statement = $this->getDriver($role)->prepare($query);
|
|
|
|
|
|
if ($this->_logQueries) {
|
|
|
$statement = $this->_newLogger($statement);
|
|
|
@@ -356,7 +412,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function compileQuery(Query $query, ValueBinder $binder): string
|
|
|
{
|
|
|
- return $this->getDriver()->compileQuery($query, $binder)[1];
|
|
|
+ return $this->getDriver($query->getConnectionRole())->compileQuery($query, $binder)[1];
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -602,7 +658,7 @@ class Connection implements ConnectionInterface
|
|
|
}
|
|
|
|
|
|
$this->getDisconnectRetry()->run(function (): void {
|
|
|
- $this->_driver->beginTransaction();
|
|
|
+ $this->getDriver()->beginTransaction();
|
|
|
});
|
|
|
|
|
|
$this->_transactionLevel = 0;
|
|
|
@@ -643,7 +699,7 @@ class Connection implements ConnectionInterface
|
|
|
$this->log('COMMIT');
|
|
|
}
|
|
|
|
|
|
- return $this->_driver->commitTransaction();
|
|
|
+ return $this->getDriver()->commitTransaction();
|
|
|
}
|
|
|
if ($this->isSavePointsEnabled()) {
|
|
|
$this->releaseSavePoint((string)$this->_transactionLevel);
|
|
|
@@ -678,7 +734,7 @@ class Connection implements ConnectionInterface
|
|
|
if ($this->_logQueries) {
|
|
|
$this->log('ROLLBACK');
|
|
|
}
|
|
|
- $this->_driver->rollbackTransaction();
|
|
|
+ $this->getDriver()->rollbackTransaction();
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
@@ -707,7 +763,7 @@ class Connection implements ConnectionInterface
|
|
|
if ($enable === false) {
|
|
|
$this->_useSavePoints = false;
|
|
|
} else {
|
|
|
- $this->_useSavePoints = $this->_driver->supports(DriverInterface::FEATURE_SAVEPOINT);
|
|
|
+ $this->_useSavePoints = $this->getDriver()->supports(DriverInterface::FEATURE_SAVEPOINT);
|
|
|
}
|
|
|
|
|
|
return $this;
|
|
|
@@ -743,7 +799,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function createSavePoint($name): void
|
|
|
{
|
|
|
- $this->execute($this->_driver->savePointSQL($name))->closeCursor();
|
|
|
+ $this->execute($this->getDriver()->savePointSQL($name))->closeCursor();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -754,7 +810,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function releaseSavePoint($name): void
|
|
|
{
|
|
|
- $sql = $this->_driver->releaseSavePointSQL($name);
|
|
|
+ $sql = $this->getDriver()->releaseSavePointSQL($name);
|
|
|
if ($sql) {
|
|
|
$this->execute($sql)->closeCursor();
|
|
|
}
|
|
|
@@ -768,7 +824,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function rollbackSavepoint($name): void
|
|
|
{
|
|
|
- $this->execute($this->_driver->rollbackSavePointSQL($name))->closeCursor();
|
|
|
+ $this->execute($this->getDriver()->rollbackSavePointSQL($name))->closeCursor();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -779,7 +835,7 @@ class Connection implements ConnectionInterface
|
|
|
public function disableForeignKeys(): void
|
|
|
{
|
|
|
$this->getDisconnectRetry()->run(function (): void {
|
|
|
- $this->execute($this->_driver->disableForeignKeySQL())->closeCursor();
|
|
|
+ $this->execute($this->getDriver()->disableForeignKeySQL())->closeCursor();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
@@ -791,7 +847,7 @@ class Connection implements ConnectionInterface
|
|
|
public function enableForeignKeys(): void
|
|
|
{
|
|
|
$this->getDisconnectRetry()->run(function (): void {
|
|
|
- $this->execute($this->_driver->enableForeignKeySQL())->closeCursor();
|
|
|
+ $this->execute($this->getDriver()->enableForeignKeySQL())->closeCursor();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
@@ -804,7 +860,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function supportsDynamicConstraints(): bool
|
|
|
{
|
|
|
- return $this->_driver->supportsDynamicConstraints();
|
|
|
+ return $this->getDriver()->supportsDynamicConstraints();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -888,7 +944,7 @@ class Connection implements ConnectionInterface
|
|
|
{
|
|
|
[$value, $type] = $this->cast($value, $type);
|
|
|
|
|
|
- return $this->_driver->quote($value, $type);
|
|
|
+ return $this->getDriver()->quote($value, $type);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -900,7 +956,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function supportsQuoting(): bool
|
|
|
{
|
|
|
- return $this->_driver->supports(DriverInterface::FEATURE_QUOTE);
|
|
|
+ return $this->getDriver()->supports(DriverInterface::FEATURE_QUOTE);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -914,7 +970,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
public function quoteIdentifier(string $identifier): string
|
|
|
{
|
|
|
- return $this->_driver->quoteIdentifier($identifier);
|
|
|
+ return $this->getDriver()->quoteIdentifier($identifier);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -1061,7 +1117,7 @@ class Connection implements ConnectionInterface
|
|
|
*/
|
|
|
protected function _newLogger(StatementInterface $statement): LoggingStatement
|
|
|
{
|
|
|
- $log = new LoggingStatement($statement, $this->_driver);
|
|
|
+ $log = new LoggingStatement($statement, $this->getDriver());
|
|
|
$log->setLogger($this->getLogger());
|
|
|
|
|
|
return $log;
|
|
|
@@ -1085,10 +1141,19 @@ class Connection implements ConnectionInterface
|
|
|
$replace = array_intersect_key($secrets, $this->_config);
|
|
|
$config = $replace + $this->_config;
|
|
|
|
|
|
+ if (isset($config['read'])) {
|
|
|
+ /** @psalm-suppress PossiblyInvalidArgument */
|
|
|
+ $config['read'] = array_intersect_key($secrets, $config['read']) + $config['read'];
|
|
|
+ }
|
|
|
+ if (isset($config['write'])) {
|
|
|
+ /** @psalm-suppress PossiblyInvalidArgument */
|
|
|
+ $config['write'] = array_intersect_key($secrets, $config['write']) + $config['write'];
|
|
|
+ }
|
|
|
+
|
|
|
return [
|
|
|
'config' => $config,
|
|
|
- 'driver' => $this->_driver,
|
|
|
- 'role' => $this->role(),
|
|
|
+ 'readDriver' => $this->readDriver,
|
|
|
+ 'writeDriver' => $this->writeDriver,
|
|
|
'transactionLevel' => $this->_transactionLevel,
|
|
|
'transactionStarted' => $this->_transactionStarted,
|
|
|
'useSavePoints' => $this->_useSavePoints,
|