Commit 19224e74 by Carsten Brandt

Elasticsearch, allow custom options for update+delete

including support for optimistic locking (#1313) fixes #5758
parent de2064d9
...@@ -10,8 +10,10 @@ namespace yii\elasticsearch; ...@@ -10,8 +10,10 @@ namespace yii\elasticsearch;
use Yii; use Yii;
use yii\base\InvalidCallException; use yii\base\InvalidCallException;
use yii\base\InvalidConfigException; use yii\base\InvalidConfigException;
use yii\base\InvalidParamException;
use yii\base\NotSupportedException; use yii\base\NotSupportedException;
use yii\db\BaseActiveRecord; use yii\db\BaseActiveRecord;
use yii\db\StaleObjectException;
use yii\helpers\ArrayHelper; use yii\helpers\ArrayHelper;
use yii\helpers\Inflector; use yii\helpers\Inflector;
use yii\helpers\Json; use yii\helpers\Json;
...@@ -447,6 +449,115 @@ class ActiveRecord extends BaseActiveRecord ...@@ -447,6 +449,115 @@ class ActiveRecord extends BaseActiveRecord
} }
/** /**
* @inheritdoc
*
* @param boolean $runValidation whether to perform validation before saving the record.
* If the validation fails, the record will not be inserted into the database.
* @param array $attributeNames list of attribute names that need to be saved. Defaults to null,
* meaning all attributes that are loaded from DB will be saved.
* @param array $options options given in this parameter are passed to elasticsearch
* as request URI parameters. These are among others:
*
* - `routing` define shard placement of this record.
* - `parent` by giving the primaryKey of another record this defines a parent-child relation
* - `timeout` timeout waiting for a shard to become available.
* - `replication` the replication type for the delete/index operation (sync or async).
* - `consistency` the write consistency of the index/delete operation.
* - `refresh` refresh the relevant primary and replica shards (not the whole index) immediately after the operation occurs, so that the updated document appears in search results immediately.
* - `detect_noop` this parameter will become part of the request body and will prevent the index from getting updated when nothing has changed.
*
* Please refer to the [elasticsearch documentation](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-update.html#_parameters_3)
* for more details on these options.
*
* The following parameters are Yii specific:
*
* - `optimistic_locking` set this to `true` to enable optimistic locking, avoid updating when the record has changed since it
* has been loaded from the database. Yii will set the `version` parameter to the value stored in [[version]].
* See the [elasticsearch documentation](http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html) for details.
*
* Make sure the record has been fetched with a [[version]] before. This is only the case
* for records fetched via [[get()]] and [[mget()]] by default. For normal queries, the `_version` field has to be fetched explicitly.
*
* @return integer|boolean the number of rows affected, or false if validation fails
* or [[beforeSave()]] stops the updating process.
* @throws StaleObjectException if optimistic locking is enabled and the data being updated is outdated.
* @throws InvalidParamException if no [[version]] is available and optimistic locking is enabled.
* @throws Exception in case update failed.
*/
public function update($runValidation = true, $attributeNames = null, $options = [])
{
if ($runValidation && !$this->validate($attributeNames)) {
return false;
}
return $this->updateInternal($attributeNames, $options);
}
/**
* @see update()
* @param array $attributes attributes to update
* @param array $options options given in this parameter are passed to elasticsearch
* as request URI parameters. See [[update()]] for details.
* @return integer|boolean the number of rows affected, or false if validation fails
* or [[beforeSave()]] stops the updating process.
* @throws StaleObjectException if optimistic locking is enabled and the data being updated is outdated.
* @throws InvalidParamException if no [[version]] is available and optimistic locking is enabled.
* @throws Exception in case update failed.
*/
protected function updateInternal($attributes = null, $options = [])
{
if (!$this->beforeSave(false)) {
return false;
}
$values = $this->getDirtyAttributes($attributes);
if (empty($values)) {
$this->afterSave(false, $values);
return 0;
}
if (isset($options['optimistic_locking']) && $options['optimistic_locking']) {
if ($this->_version === null) {
throw new InvalidParamException('Unable to use optimistic locking on a record that has no version set. Refer to the docs of ActiveRecord::update() for details.');
}
$options['version'] = $this->_version;
unset($options['optimistic_locking']);
}
try {
$result = static::getDb()->createCommand()->update(
static::index(),
static::type(),
$this->getOldPrimaryKey(false),
$values,
$options
);
} catch(Exception $e) {
// HTTP 409 is the response in case of failed optimistic locking
// http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html
if (isset($e->errorInfo['responseCode']) && $e->errorInfo['responseCode'] == 409) {
throw new StaleObjectException('The object being updated is outdated.', $e->errorInfo, $e->getCode(), $e);
}
throw $e;
}
if (is_array($result) && isset($result['_version'])) {
$this->_version = $result['_version'];
}
$changedAttributes = [];
foreach ($values as $name => $value) {
$changedAttributes[$name] = $this->getOldAttribute($name);
$this->setOldAttribute($name, $value);
}
$this->afterSave(false, $changedAttributes);
if ($result === false) {
return 0;
} else {
return 1;
}
}
/**
* Updates all records whos primary keys are given. * Updates all records whos primary keys are given.
* For example, to change the status to be 1 for all customers whose status is 2: * For example, to change the status to be 1 for all customers whose status is 2:
* *
...@@ -572,6 +683,76 @@ class ActiveRecord extends BaseActiveRecord ...@@ -572,6 +683,76 @@ class ActiveRecord extends BaseActiveRecord
} }
/** /**
* @inheritdoc
*
* @param array $options options given in this parameter are passed to elasticsearch
* as request URI parameters. These are among others:
*
* - `routing` define shard placement of this record.
* - `parent` by giving the primaryKey of another record this defines a parent-child relation
* - `timeout` timeout waiting for a shard to become available.
* - `replication` the replication type for the delete/index operation (sync or async).
* - `consistency` the write consistency of the index/delete operation.
* - `refresh` refresh the relevant primary and replica shards (not the whole index) immediately after the operation occurs, so that the updated document appears in search results immediately.
*
* Please refer to the [elasticsearch documentation](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete.html)
* for more details on these options.
*
* The following parameters are Yii specific:
*
* - `optimistic_locking` set this to `true` to enable optimistic locking, avoid updating when the record has changed since it
* has been loaded from the database. Yii will set the `version` parameter to the value stored in [[version]].
* See the [elasticsearch documentation](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete.html#delete-versioning) for details.
*
* Make sure the record has been fetched with a [[version]] before. This is only the case
* for records fetched via [[get()]] and [[mget()]] by default. For normal queries, the `_version` field has to be fetched explicitly.
*
* @return integer|boolean the number of rows deleted, or false if the deletion is unsuccessful for some reason.
* Note that it is possible the number of rows deleted is 0, even though the deletion execution is successful.
* @throws StaleObjectException if optimistic locking is enabled and the data being deleted is outdated.
* @throws Exception in case delete failed.
*/
public function delete($options = [])
{
if (!$this->beforeDelete()) {
return false;
}
if (isset($options['optimistic_locking']) && $options['optimistic_locking']) {
if ($this->_version === null) {
throw new InvalidParamException('Unable to use optimistic locking on a record that has no version set. Refer to the docs of ActiveRecord::delete() for details.');
}
$options['version'] = $this->_version;
unset($options['optimistic_locking']);
}
try {
$result = static::getDb()->createCommand()->delete(
static::index(),
static::type(),
$this->getOldPrimaryKey(false),
$options
);
} catch(Exception $e) {
// HTTP 409 is the response in case of failed optimistic locking
// http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html
if (isset($e->errorInfo['responseCode']) && $e->errorInfo['responseCode'] == 409) {
throw new StaleObjectException('The object being deleted is outdated.', $e->errorInfo, $e->getCode(), $e);
}
throw $e;
}
$this->setOldAttributes(null);
$this->afterDelete();
if ($result === false) {
return 0;
} else {
return 1;
}
}
/**
* Deletes rows in the table using the provided conditions. * Deletes rows in the table using the provided conditions.
* WARNING: If you do not specify any condition, this method will delete ALL rows in the table. * WARNING: If you do not specify any condition, this method will delete ALL rows in the table.
* *
...@@ -630,6 +811,17 @@ class ActiveRecord extends BaseActiveRecord ...@@ -630,6 +811,17 @@ class ActiveRecord extends BaseActiveRecord
} }
/** /**
* This method has no effect in Elasticsearch ActiveRecord.
*
* Elasticsearch ActiveRecord uses [native Optimistic locking](http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html).
* See [[update()]] for more details.
*/
public function optimisticLock()
{
return null;
}
/**
* Destroys the relationship in current model. * Destroys the relationship in current model.
* *
* This method is not supported by elasticsearch. * This method is not supported by elasticsearch.
......
...@@ -6,6 +6,8 @@ Yii Framework 2 elasticsearch extension Change Log ...@@ -6,6 +6,8 @@ Yii Framework 2 elasticsearch extension Change Log
- Bug #5662: Elasticsearch AR updateCounters() now uses explicitly `groovy` script for updating making it compatible with ES >1.3.0 (cebe) - Bug #5662: Elasticsearch AR updateCounters() now uses explicitly `groovy` script for updating making it compatible with ES >1.3.0 (cebe)
- Bug #6065: `ActiveRecord::unlink()` was failing in some situations when working with relations via array valued attributes (cebe) - Bug #6065: `ActiveRecord::unlink()` was failing in some situations when working with relations via array valued attributes (cebe)
- Enh #5758: Allow passing custom options to `ActiveRecord::update()` and `::delete()` including support for routing needed for updating records with parent relation (cebe)
- Enh: Add support for optimistic locking (cebe)
2.0.0 October 12, 2014 2.0.0 October 12, 2014
......
...@@ -220,11 +220,18 @@ class Command extends Component ...@@ -220,11 +220,18 @@ class Command extends Component
* @return mixed * @return mixed
* @see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-update.html * @see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-update.html
*/ */
// public function update($index, $type, $id, $data, $options = []) public function update($index, $type, $id, $data, $options = [])
// { {
// // TODO implement $body = [
//// return $this->db->delete([$index, $type, $id], $options); 'doc' => empty($data) ? new \stdClass() : $data,
// } ];
if (isset($options["detect_noop"])) {
$body["detect_noop"] = $options["detect_noop"];
unset($options["detect_noop"]);
}
return $this->db->post([$index, $type, $id, '_update'], $options, Json::encode($body));
}
// TODO bulk http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html // TODO bulk http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html
......
...@@ -198,7 +198,6 @@ class Connection extends Component ...@@ -198,7 +198,6 @@ class Connection extends Component
public function get($url, $options = [], $body = null, $raw = false) public function get($url, $options = [], $body = null, $raw = false)
{ {
$this->open(); $this->open();
return $this->httpRequest('GET', $this->createUrl($url, $options), $body, $raw); return $this->httpRequest('GET', $this->createUrl($url, $options), $body, $raw);
} }
...@@ -215,7 +214,6 @@ class Connection extends Component ...@@ -215,7 +214,6 @@ class Connection extends Component
public function head($url, $options = [], $body = null) public function head($url, $options = [], $body = null)
{ {
$this->open(); $this->open();
return $this->httpRequest('HEAD', $this->createUrl($url, $options), $body); return $this->httpRequest('HEAD', $this->createUrl($url, $options), $body);
} }
...@@ -233,7 +231,6 @@ class Connection extends Component ...@@ -233,7 +231,6 @@ class Connection extends Component
public function post($url, $options = [], $body = null, $raw = false) public function post($url, $options = [], $body = null, $raw = false)
{ {
$this->open(); $this->open();
return $this->httpRequest('POST', $this->createUrl($url, $options), $body, $raw); return $this->httpRequest('POST', $this->createUrl($url, $options), $body, $raw);
} }
...@@ -251,7 +248,6 @@ class Connection extends Component ...@@ -251,7 +248,6 @@ class Connection extends Component
public function put($url, $options = [], $body = null, $raw = false) public function put($url, $options = [], $body = null, $raw = false)
{ {
$this->open(); $this->open();
return $this->httpRequest('PUT', $this->createUrl($url, $options), $body, $raw); return $this->httpRequest('PUT', $this->createUrl($url, $options), $body, $raw);
} }
...@@ -269,7 +265,6 @@ class Connection extends Component ...@@ -269,7 +265,6 @@ class Connection extends Component
public function delete($url, $options = [], $body = null, $raw = false) public function delete($url, $options = [], $body = null, $raw = false)
{ {
$this->open(); $this->open();
return $this->httpRequest('DELETE', $this->createUrl($url, $options), $body, $raw); return $this->httpRequest('DELETE', $this->createUrl($url, $options), $body, $raw);
} }
...@@ -319,7 +314,7 @@ class Connection extends Component ...@@ -319,7 +314,7 @@ class Connection extends Component
$body = ''; $body = '';
$options = [ $options = [
CURLOPT_USERAGENT => 'Yii Framework ' . Yii::getVersion() . __CLASS__, CURLOPT_USERAGENT => 'Yii Framework ' . Yii::getVersion() . ' ' . __CLASS__,
CURLOPT_RETURNTRANSFER => false, CURLOPT_RETURNTRANSFER => false,
CURLOPT_HEADER => false, CURLOPT_HEADER => false,
// http://www.php.net/manual/en/function.curl-setopt.php#82418 // http://www.php.net/manual/en/function.curl-setopt.php#82418
...@@ -327,7 +322,6 @@ class Connection extends Component ...@@ -327,7 +322,6 @@ class Connection extends Component
CURLOPT_WRITEFUNCTION => function ($curl, $data) use (&$body) { CURLOPT_WRITEFUNCTION => function ($curl, $data) use (&$body) {
$body .= $data; $body .= $data;
return mb_strlen($data, '8bit'); return mb_strlen($data, '8bit');
}, },
CURLOPT_HEADERFUNCTION => function ($curl, $data) use (&$headers) { CURLOPT_HEADERFUNCTION => function ($curl, $data) use (&$headers) {
...@@ -336,7 +330,6 @@ class Connection extends Component ...@@ -336,7 +330,6 @@ class Connection extends Component
$headers[strtolower(substr($row, 0, $pos))] = trim(substr($row, $pos + 1)); $headers[strtolower(substr($row, 0, $pos))] = trim(substr($row, $pos + 1));
} }
} }
return mb_strlen($data, '8bit'); return mb_strlen($data, '8bit');
}, },
CURLOPT_CUSTOMREQUEST => $method, CURLOPT_CUSTOMREQUEST => $method,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment