vendor/shopware/elasticsearch/Admin/AdminSearchRegistry.php line 157

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Elasticsearch\Admin;
  3. use Doctrine\DBAL\Connection;
  4. use Elasticsearch\Client;
  5. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  6. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  7. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  8. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  9. use Shopware\Core\Framework\Log\Package;
  10. use Shopware\Core\Framework\Uuid\Uuid;
  11. use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
  12. use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
  13. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  14. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  15. use Symfony\Component\Messenger\MessageBusInterface;
  16. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  17. /**
  18. * @internal
  19. *
  20. * @final
  21. */
  22. #[Package('system-settings')]
  23. class AdminSearchRegistry implements MessageHandlerInterface, EventSubscriberInterface
  24. {
  25. /**
  26. * @var array<string, mixed>
  27. */
  28. private array $indexer;
  29. private Connection $connection;
  30. private MessageBusInterface $queue;
  31. private EventDispatcherInterface $dispatcher;
  32. private Client $client;
  33. private AdminElasticsearchHelper $adminEsHelper;
  34. /**
  35. * @var array<mixed>
  36. */
  37. private array $config;
  38. /**
  39. * @var array<mixed>
  40. */
  41. private array $mapping;
  42. /**
  43. * @param AbstractAdminIndexer[] $indexer
  44. * @param array<mixed> $config
  45. * @param array<mixed> $mapping
  46. */
  47. public function __construct(
  48. $indexer,
  49. Connection $connection,
  50. MessageBusInterface $queue,
  51. EventDispatcherInterface $dispatcher,
  52. Client $client,
  53. AdminElasticsearchHelper $adminEsHelper,
  54. array $config,
  55. array $mapping
  56. ) {
  57. $this->indexer = $indexer instanceof \Traversable ? iterator_to_array($indexer) : $indexer;
  58. $this->connection = $connection;
  59. $this->queue = $queue;
  60. $this->dispatcher = $dispatcher;
  61. $this->client = $client;
  62. $this->adminEsHelper = $adminEsHelper;
  63. $this->mapping = $mapping;
  64. if (isset($config['settings']['index'])) {
  65. if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
  66. unset($config['settings']['index']['number_of_shards']);
  67. }
  68. if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
  69. unset($config['settings']['index']['number_of_replicas']);
  70. }
  71. }
  72. $this->config = $config;
  73. }
  74. public function __invoke(AdminSearchIndexingMessage $message): void
  75. {
  76. $indexer = $this->getIndexer($message->getEntity());
  77. $documents = $indexer->fetch($message->getIds());
  78. $this->push($indexer, $message->getIndices(), $documents, $message->getIds());
  79. }
  80. public static function getSubscribedEvents(): array
  81. {
  82. return [
  83. EntityWrittenContainerEvent::class => [
  84. ['refresh', -1000],
  85. ],
  86. ];
  87. }
  88. /**
  89. * @return iterable<class-string>
  90. */
  91. public static function getHandledMessages(): iterable
  92. {
  93. return [
  94. AdminSearchIndexingMessage::class,
  95. ];
  96. }
  97. public function iterate(AdminIndexingBehavior $indexingBehavior): void
  98. {
  99. if ($this->adminEsHelper->getEnabled() === false) {
  100. return;
  101. }
  102. /** @var array<string> $entities */
  103. $entities = array_keys($this->indexer);
  104. if ($indexingBehavior->getOnlyEntities()) {
  105. $entities = array_intersect($entities, $indexingBehavior->getOnlyEntities());
  106. } elseif ($indexingBehavior->getSkipEntities()) {
  107. $entities = array_diff($entities, $indexingBehavior->getSkipEntities());
  108. }
  109. $indices = $this->createIndices($entities);
  110. foreach ($entities as $entityName) {
  111. $indexer = $this->getIndexer($entityName);
  112. $iterator = $indexer->getIterator();
  113. $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
  114. while ($ids = $iterator->fetch()) {
  115. // we provide no queue when the data is sent by the admin
  116. if ($indexingBehavior->getNoQueue() === true) {
  117. $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
  118. } else {
  119. $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
  120. }
  121. $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
  122. }
  123. $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  124. }
  125. $this->swapAlias($indices);
  126. }
  127. public function refresh(EntityWrittenContainerEvent $event): void
  128. {
  129. if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) {
  130. return;
  131. }
  132. if ($this->adminEsHelper->getRefreshIndices()) {
  133. $this->refreshIndices();
  134. }
  135. /** @var array<string, string> $indices */
  136. $indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
  137. if (empty($indices)) {
  138. return;
  139. }
  140. foreach ($this->indexer as $indexer) {
  141. $ids = $event->getPrimaryKeys($indexer->getEntity());
  142. if (empty($ids)) {
  143. continue;
  144. }
  145. $documents = $indexer->fetch($ids);
  146. $this->push($indexer, $indices, $documents, $ids);
  147. }
  148. }
  149. /**
  150. * @return AbstractAdminIndexer[]
  151. */
  152. public function getIndexers(): iterable
  153. {
  154. return $this->indexer;
  155. }
  156. public function getIndexer(string $name): AbstractAdminIndexer
  157. {
  158. $indexer = $this->indexer[$name] ?? null;
  159. if ($indexer) {
  160. return $indexer;
  161. }
  162. throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]);
  163. }
  164. private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
  165. {
  166. foreach ($this->indexer as $indexer) {
  167. $ids = $event->getPrimaryKeys($indexer->getEntity());
  168. if (!empty($ids)) {
  169. return true;
  170. }
  171. }
  172. return false;
  173. }
  174. /**
  175. * @param array<string, string> $indices
  176. * @param array<string, array<string|int, string>> $data
  177. * @param array<string> $ids
  178. */
  179. private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
  180. {
  181. $alias = $this->adminEsHelper->getIndex($indexer->getName());
  182. if (!isset($indices[$alias])) {
  183. return;
  184. }
  185. $toRemove = array_filter($ids, static function (string $id) use ($data): bool {
  186. return !isset($data[$id]);
  187. });
  188. $documents = [];
  189. foreach ($data as $id => $document) {
  190. $documents[] = ['index' => ['_id' => $id]];
  191. $documents[] = \array_replace(
  192. ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''],
  193. $document
  194. );
  195. }
  196. foreach ($toRemove as $id) {
  197. $documents[] = ['delete' => ['_id' => $id]];
  198. }
  199. $arguments = [
  200. 'index' => $indices[$alias],
  201. 'body' => $documents,
  202. ];
  203. $result = $this->client->bulk($arguments);
  204. if (\is_array($result) && !empty($result['errors'])) {
  205. $errors = $this->parseErrors($result);
  206. throw new ElasticsearchIndexingException($errors);
  207. }
  208. }
  209. /**
  210. * @param array<string> $entities
  211. *
  212. * @throws \Doctrine\DBAL\Exception
  213. *
  214. * @return array<string, string>
  215. */
  216. private function createIndices(array $entities): array
  217. {
  218. $indexTasks = [];
  219. $indices = [];
  220. foreach ($entities as $entityName) {
  221. $indexer = $this->getIndexer($entityName);
  222. $alias = $this->adminEsHelper->getIndex($indexer->getName());
  223. $index = $alias . '_' . time();
  224. if ($this->indexExists($index)) {
  225. continue;
  226. }
  227. $indices[$alias] = $index;
  228. $this->create($indexer, $index, $alias);
  229. $iterator = $indexer->getIterator();
  230. $indexTasks[] = [
  231. 'id' => Uuid::randomBytes(),
  232. '`entity`' => $indexer->getEntity(),
  233. '`index`' => $index,
  234. '`alias`' => $alias,
  235. '`doc_count`' => $iterator->fetchCount(),
  236. ];
  237. }
  238. $this->connection->executeStatement(
  239. 'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  240. ['entities' => $entities],
  241. ['entities' => Connection::PARAM_STR_ARRAY]
  242. );
  243. foreach ($indexTasks as $task) {
  244. $this->connection->insert('admin_elasticsearch_index_task', $task);
  245. }
  246. return $indices;
  247. }
  248. private function refreshIndices(): void
  249. {
  250. $entities = [];
  251. $indexTasks = [];
  252. foreach ($this->indexer as $indexer) {
  253. $alias = $this->adminEsHelper->getIndex($indexer->getName());
  254. if ($this->aliasExists($alias)) {
  255. continue;
  256. }
  257. $index = $alias . '_' . time();
  258. $this->create($indexer, $index, $alias);
  259. $entities[] = $indexer->getEntity();
  260. $iterator = $indexer->getIterator();
  261. $indexTasks[] = [
  262. 'id' => Uuid::randomBytes(),
  263. '`entity`' => $indexer->getEntity(),
  264. '`index`' => $index,
  265. '`alias`' => $alias,
  266. '`doc_count`' => $iterator->fetchCount(),
  267. ];
  268. }
  269. $this->connection->executeStatement(
  270. 'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  271. ['entities' => $entities],
  272. ['entities' => Connection::PARAM_STR_ARRAY]
  273. );
  274. foreach ($indexTasks as $task) {
  275. $this->connection->insert('admin_elasticsearch_index_task', $task);
  276. }
  277. }
  278. private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void
  279. {
  280. $mapping = $indexer->mapping([
  281. 'properties' => [
  282. 'id' => ['type' => 'keyword'],
  283. 'textBoosted' => ['type' => 'text'],
  284. 'text' => ['type' => 'text'],
  285. 'entityName' => ['type' => 'keyword'],
  286. 'parameters' => ['type' => 'keyword'],
  287. ],
  288. ]);
  289. $mapping = array_merge_recursive($mapping, $this->mapping);
  290. $body = array_merge(
  291. $this->config,
  292. ['mappings' => $mapping]
  293. );
  294. $this->client->indices()->create([
  295. 'index' => $index,
  296. 'body' => $body,
  297. ]);
  298. $this->createAliasIfNotExisting($index, $alias);
  299. }
  300. private function indexExists(string $name): bool
  301. {
  302. return $this->client->indices()->exists(['index' => $name]);
  303. }
  304. private function aliasExists(string $alias): bool
  305. {
  306. return $this->client->indices()->existsAlias(['name' => $alias]);
  307. }
  308. /**
  309. * @param array<string, array<array<string, mixed>>> $result
  310. *
  311. * @return array<array{reason: string}|string>
  312. */
  313. private function parseErrors(array $result): array
  314. {
  315. $errors = [];
  316. foreach ($result['items'] as $item) {
  317. $item = $item['index'] ?? $item['delete'];
  318. if (\in_array($item['status'], [200, 201], true)) {
  319. continue;
  320. }
  321. $errors[] = [
  322. 'index' => $item['_index'],
  323. 'id' => $item['_id'],
  324. 'type' => $item['error']['type'] ?? $item['_type'],
  325. 'reason' => $item['error']['reason'] ?? $item['result'],
  326. ];
  327. }
  328. return $errors;
  329. }
  330. private function createAliasIfNotExisting(string $index, string $alias): void
  331. {
  332. $exist = $this->client->indices()->existsAlias(['name' => $alias]);
  333. if ($exist) {
  334. return;
  335. }
  336. $this->putAlias($index, $alias);
  337. }
  338. /**
  339. * @param array<string, string> $indices
  340. */
  341. private function swapAlias($indices): void
  342. {
  343. foreach ($indices as $alias => $index) {
  344. $exist = $this->client->indices()->existsAlias(['name' => $alias]);
  345. if (!$exist) {
  346. $this->putAlias($index, $alias);
  347. return;
  348. }
  349. $current = $this->client->indices()->getAlias(['name' => $alias]);
  350. if (!isset($current[$index])) {
  351. $this->putAlias($index, $alias);
  352. }
  353. unset($current[$index]);
  354. $current = array_keys($current);
  355. foreach ($current as $value) {
  356. $this->client->indices()->delete(['index' => $value]);
  357. }
  358. }
  359. }
  360. private function putAlias(string $index, string $alias): void
  361. {
  362. $this->client->indices()->refresh([
  363. 'index' => $index,
  364. ]);
  365. $this->client->indices()->putAlias(['index' => $index, 'name' => $alias]);
  366. }
  367. }