vendor/shopware/core/Content/Product/DataAbstractionLayer/StockUpdater.php line 96

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Content\Product\DataAbstractionLayer;
  3. use Doctrine\DBAL\Connection;
  4. use Shopware\Core\Checkout\Cart\Event\CheckoutOrderPlacedEvent;
  5. use Shopware\Core\Checkout\Cart\LineItem\LineItem;
  6. use Shopware\Core\Checkout\Order\Aggregate\OrderLineItem\OrderLineItemDefinition;
  7. use Shopware\Core\Checkout\Order\OrderEvents;
  8. use Shopware\Core\Checkout\Order\OrderStates;
  9. use Shopware\Core\Content\Product\DataAbstractionLayer\StockUpdate\StockUpdateFilterProvider;
  10. use Shopware\Core\Content\Product\Events\ProductNoLongerAvailableEvent;
  11. use Shopware\Core\Defaults;
  12. use Shopware\Core\Framework\Context;
  13. use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\RetryableQuery;
  14. use Shopware\Core\Framework\DataAbstractionLayer\EntityWriteResult;
  15. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenEvent;
  16. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\ChangeSetAware;
  17. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\DeleteCommand;
  18. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\WriteCommand;
  19. use Shopware\Core\Framework\DataAbstractionLayer\Write\Validation\PreWriteValidationEvent;
  20. use Shopware\Core\Framework\Log\Package;
  21. use Shopware\Core\Framework\Uuid\Uuid;
  22. use Shopware\Core\Profiling\Profiler;
  23. use Shopware\Core\System\StateMachine\Event\StateMachineTransitionEvent;
  24. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  25. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  26. /**
  27. * @deprecated tag:v6.5.0 - reason:becomes-internal - EventSubscribers will become internal in v6.5.0
  28. */
  29. #[Package('core')]
  30. class StockUpdater implements EventSubscriberInterface
  31. {
  32. private Connection $connection;
  33. private EventDispatcherInterface $dispatcher;
  34. private StockUpdateFilterProvider $stockUpdateFilter;
  35. /**
  36. * @internal
  37. */
  38. public function __construct(
  39. Connection $connection,
  40. EventDispatcherInterface $dispatcher,
  41. StockUpdateFilterProvider $stockUpdateFilter
  42. ) {
  43. $this->connection = $connection;
  44. $this->dispatcher = $dispatcher;
  45. $this->stockUpdateFilter = $stockUpdateFilter;
  46. }
  47. /**
  48. * Returns a list of custom business events to listen where the product maybe changed
  49. *
  50. * @return array<string, string|array{0: string, 1: int}|list<array{0: string, 1?: int}>>
  51. */
  52. public static function getSubscribedEvents()
  53. {
  54. return [
  55. CheckoutOrderPlacedEvent::class => 'orderPlaced',
  56. StateMachineTransitionEvent::class => 'stateChanged',
  57. PreWriteValidationEvent::class => 'triggerChangeSet',
  58. OrderEvents::ORDER_LINE_ITEM_WRITTEN_EVENT => 'lineItemWritten',
  59. OrderEvents::ORDER_LINE_ITEM_DELETED_EVENT => 'lineItemWritten',
  60. ];
  61. }
  62. public function triggerChangeSet(PreWriteValidationEvent $event): void
  63. {
  64. if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  65. return;
  66. }
  67. foreach ($event->getCommands() as $command) {
  68. if (!$command instanceof ChangeSetAware) {
  69. continue;
  70. }
  71. /** @var ChangeSetAware&WriteCommand $command */
  72. if ($command->getDefinition()->getEntityName() !== OrderLineItemDefinition::ENTITY_NAME) {
  73. continue;
  74. }
  75. if ($command instanceof DeleteCommand) {
  76. $command->requestChangeSet();
  77. continue;
  78. }
  79. /** @var WriteCommand&ChangeSetAware $command */
  80. if ($command->hasField('referenced_id') || $command->hasField('product_id') || $command->hasField('quantity')) {
  81. $command->requestChangeSet();
  82. }
  83. }
  84. }
  85. public function orderPlaced(CheckoutOrderPlacedEvent $event): void
  86. {
  87. $ids = [];
  88. foreach ($event->getOrder()->getLineItems() ?? [] as $lineItem) {
  89. if ($lineItem->getType() !== LineItem::PRODUCT_LINE_ITEM_TYPE) {
  90. continue;
  91. }
  92. $referencedId = $lineItem->getReferencedId();
  93. if (!$referencedId) {
  94. continue;
  95. }
  96. if (!\array_key_exists($referencedId, $ids)) {
  97. $ids[$referencedId] = 0;
  98. }
  99. $ids[$referencedId] += $lineItem->getQuantity();
  100. }
  101. $filteredIds = $this->stockUpdateFilter->filterProductIdsForStockUpdates(\array_keys($ids), $event->getContext());
  102. $ids = \array_filter($ids, static fn (string $id) => \in_array($id, $filteredIds, true), \ARRAY_FILTER_USE_KEY);
  103. // order placed event is a high load event. Because of the high load, we simply reduce the quantity here instead of executing the high costs `update` function
  104. $query = new RetryableQuery(
  105. $this->connection,
  106. $this->connection->prepare('UPDATE product SET available_stock = available_stock - :quantity WHERE id = :id')
  107. );
  108. Profiler::trace('order::update-stock', static function () use ($query, $ids): void {
  109. foreach ($ids as $id => $quantity) {
  110. $query->execute(['id' => Uuid::fromHexToBytes((string) $id), 'quantity' => $quantity]);
  111. }
  112. });
  113. Profiler::trace('order::update-flag', function () use ($ids, $event): void {
  114. $this->updateAvailableFlag(\array_keys($ids), $event->getContext());
  115. });
  116. }
  117. /**
  118. * If the product of an order item changed, the stocks of the old product and the new product must be updated.
  119. */
  120. public function lineItemWritten(EntityWrittenEvent $event): void
  121. {
  122. $ids = [];
  123. // we don't want to trigger to `update` method when we are inside the order process
  124. if ($event->getContext()->hasState('checkout-order-route')) {
  125. return;
  126. }
  127. foreach ($event->getWriteResults() as $result) {
  128. if ($result->hasPayload('referencedId') && $result->getProperty('type') === LineItem::PRODUCT_LINE_ITEM_TYPE) {
  129. $ids[] = $result->getProperty('referencedId');
  130. }
  131. if ($result->getOperation() === EntityWriteResult::OPERATION_INSERT) {
  132. continue;
  133. }
  134. $changeSet = $result->getChangeSet();
  135. if (!$changeSet) {
  136. continue;
  137. }
  138. $type = $changeSet->getBefore('type');
  139. if ($type !== LineItem::PRODUCT_LINE_ITEM_TYPE) {
  140. continue;
  141. }
  142. if (!$changeSet->hasChanged('referenced_id') && !$changeSet->hasChanged('quantity')) {
  143. continue;
  144. }
  145. $ids[] = $changeSet->getBefore('referenced_id');
  146. $ids[] = $changeSet->getAfter('referenced_id');
  147. }
  148. $ids = array_filter(array_unique($ids));
  149. if (empty($ids)) {
  150. return;
  151. }
  152. $this->update($ids, $event->getContext());
  153. }
  154. public function stateChanged(StateMachineTransitionEvent $event): void
  155. {
  156. if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  157. return;
  158. }
  159. if ($event->getEntityName() !== 'order') {
  160. return;
  161. }
  162. if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  163. $this->decreaseStock($event);
  164. return;
  165. }
  166. if ($event->getFromPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  167. $this->increaseStock($event);
  168. return;
  169. }
  170. if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED || $event->getFromPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED) {
  171. $products = $this->getProductsOfOrder($event->getEntityId(), $event->getContext());
  172. $ids = array_column($products, 'referenced_id');
  173. $this->updateAvailableStockAndSales($ids, $event->getContext());
  174. $this->updateAvailableFlag($ids, $event->getContext());
  175. }
  176. }
  177. /**
  178. * @param list<string> $ids
  179. */
  180. public function update(array $ids, Context $context): void
  181. {
  182. if ($context->getVersionId() !== Defaults::LIVE_VERSION) {
  183. return;
  184. }
  185. $ids = $this->stockUpdateFilter->filterProductIdsForStockUpdates($ids, $context);
  186. $this->updateAvailableStockAndSales($ids, $context);
  187. $this->updateAvailableFlag($ids, $context);
  188. }
  189. private function increaseStock(StateMachineTransitionEvent $event): void
  190. {
  191. $products = $this->getProductsOfOrder($event->getEntityId(), $event->getContext());
  192. $ids = array_column($products, 'referenced_id');
  193. $this->updateStock($products, +1);
  194. $this->updateAvailableStockAndSales($ids, $event->getContext());
  195. $this->updateAvailableFlag($ids, $event->getContext());
  196. }
  197. private function decreaseStock(StateMachineTransitionEvent $event): void
  198. {
  199. $products = $this->getProductsOfOrder($event->getEntityId(), $event->getContext());
  200. $ids = array_column($products, 'referenced_id');
  201. $this->updateStock($products, -1);
  202. $this->updateAvailableStockAndSales($ids, $event->getContext());
  203. $this->updateAvailableFlag($ids, $event->getContext());
  204. }
  205. /**
  206. * @param list<string> $ids
  207. */
  208. private function updateAvailableStockAndSales(array $ids, Context $context): void
  209. {
  210. $ids = array_filter(array_keys(array_flip($ids)));
  211. if (empty($ids)) {
  212. return;
  213. }
  214. $sql = '
  215. SELECT LOWER(HEX(order_line_item.product_id)) as product_id,
  216. IFNULL(
  217. SUM(IF(state_machine_state.technical_name = :completed_state, 0, order_line_item.quantity)),
  218. 0
  219. ) as open_quantity,
  220. IFNULL(
  221. SUM(IF(state_machine_state.technical_name = :completed_state, order_line_item.quantity, 0)),
  222. 0
  223. ) as sales_quantity
  224. FROM order_line_item
  225. INNER JOIN `order`
  226. ON `order`.id = order_line_item.order_id
  227. AND `order`.version_id = order_line_item.order_version_id
  228. INNER JOIN state_machine_state
  229. ON state_machine_state.id = `order`.state_id
  230. AND state_machine_state.technical_name <> :cancelled_state
  231. WHERE order_line_item.product_id IN (:ids)
  232. AND order_line_item.type = :type
  233. AND order_line_item.version_id = :version
  234. AND order_line_item.product_id IS NOT NULL
  235. GROUP BY product_id;
  236. ';
  237. $rows = $this->connection->fetchAllAssociative(
  238. $sql,
  239. [
  240. 'type' => LineItem::PRODUCT_LINE_ITEM_TYPE,
  241. 'version' => Uuid::fromHexToBytes($context->getVersionId()),
  242. 'completed_state' => OrderStates::STATE_COMPLETED,
  243. 'cancelled_state' => OrderStates::STATE_CANCELLED,
  244. 'ids' => Uuid::fromHexToBytesList($ids),
  245. ],
  246. [
  247. 'ids' => Connection::PARAM_STR_ARRAY,
  248. ]
  249. );
  250. $fallback = array_column($rows, 'product_id');
  251. $fallback = array_diff($ids, $fallback);
  252. $update = new RetryableQuery(
  253. $this->connection,
  254. $this->connection->prepare('UPDATE product SET available_stock = stock - :open_quantity, sales = :sales_quantity, updated_at = :now WHERE id = :id')
  255. );
  256. foreach ($fallback as $id) {
  257. $update->execute([
  258. 'id' => Uuid::fromHexToBytes((string) $id),
  259. 'open_quantity' => 0,
  260. 'sales_quantity' => 0,
  261. 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  262. ]);
  263. }
  264. foreach ($rows as $row) {
  265. $update->execute([
  266. 'id' => Uuid::fromHexToBytes($row['product_id']),
  267. 'open_quantity' => $row['open_quantity'],
  268. 'sales_quantity' => $row['sales_quantity'],
  269. 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  270. ]);
  271. }
  272. }
  273. /**
  274. * @param list<string> $ids
  275. */
  276. private function updateAvailableFlag(array $ids, Context $context): void
  277. {
  278. $ids = array_filter(array_unique($ids));
  279. if (empty($ids)) {
  280. return;
  281. }
  282. $bytes = Uuid::fromHexToBytesList($ids);
  283. $sql = '
  284. UPDATE product
  285. LEFT JOIN product parent
  286. ON parent.id = product.parent_id
  287. AND parent.version_id = product.version_id
  288. SET product.available = IFNULL((
  289. IFNULL(product.is_closeout, parent.is_closeout) * product.available_stock
  290. >=
  291. IFNULL(product.is_closeout, parent.is_closeout) * IFNULL(product.min_purchase, parent.min_purchase)
  292. ), 0)
  293. WHERE product.id IN (:ids)
  294. AND product.version_id = :version
  295. ';
  296. RetryableQuery::retryable($this->connection, function () use ($sql, $context, $bytes): void {
  297. $this->connection->executeStatement(
  298. $sql,
  299. ['ids' => $bytes, 'version' => Uuid::fromHexToBytes($context->getVersionId())],
  300. ['ids' => Connection::PARAM_STR_ARRAY]
  301. );
  302. });
  303. $updated = $this->connection->fetchFirstColumn(
  304. 'SELECT LOWER(HEX(id)) FROM product WHERE available = 0 AND id IN (:ids) AND product.version_id = :version',
  305. ['ids' => $bytes, 'version' => Uuid::fromHexToBytes($context->getVersionId())],
  306. ['ids' => Connection::PARAM_STR_ARRAY]
  307. );
  308. if (!empty($updated)) {
  309. $this->dispatcher->dispatch(new ProductNoLongerAvailableEvent($updated, $context));
  310. }
  311. }
  312. /**
  313. * @param list<array{referenced_id: string, quantity: string}> $products
  314. */
  315. private function updateStock(array $products, int $multiplier): void
  316. {
  317. $query = new RetryableQuery(
  318. $this->connection,
  319. $this->connection->prepare('UPDATE product SET stock = stock + :quantity WHERE id = :id AND version_id = :version')
  320. );
  321. foreach ($products as $product) {
  322. $query->execute([
  323. 'quantity' => (int) $product['quantity'] * $multiplier,
  324. 'id' => Uuid::fromHexToBytes($product['referenced_id']),
  325. 'version' => Uuid::fromHexToBytes(Defaults::LIVE_VERSION),
  326. ]);
  327. }
  328. }
  329. /**
  330. * @return list<array{referenced_id: string, quantity: string}>
  331. */
  332. private function getProductsOfOrder(string $orderId, Context $context): array
  333. {
  334. $query = $this->connection->createQueryBuilder();
  335. $query->select(['referenced_id', 'quantity']);
  336. $query->from('order_line_item');
  337. $query->andWhere('type = :type');
  338. $query->andWhere('order_id = :id');
  339. $query->andWhere('version_id = :version');
  340. $query->setParameter('id', Uuid::fromHexToBytes($orderId));
  341. $query->setParameter('version', Uuid::fromHexToBytes(Defaults::LIVE_VERSION));
  342. $query->setParameter('type', LineItem::PRODUCT_LINE_ITEM_TYPE);
  343. /** @var list<array{referenced_id: string, quantity: string}> $result */
  344. $result = $query->executeQuery()->fetchAllAssociative();
  345. $filteredIds = $this->stockUpdateFilter->filterProductIdsForStockUpdates(\array_column($result, 'referenced_id'), $context);
  346. return \array_filter($result, static fn (array $item) => \in_array($item['referenced_id'], $filteredIds, true));
  347. }
  348. }