mirror of
https://github.com/wallabag/wallabag.git
synced 2025-01-11 01:15:26 +00:00
Merge pull request #6166 from wallabag/fix/controller-as-a-service-rabbitmq
Add `RabbitMQConsumerTotalProxy` to lazy RabbitMQ services for messages
This commit is contained in:
commit
b6d0fae856
12 changed files with 152 additions and 61 deletions
|
@ -5,6 +5,20 @@ services:
|
|||
autoconfigure: true
|
||||
public: true
|
||||
|
||||
Wallabag\ImportBundle\Consumer\RabbitMQConsumerTotalProxy:
|
||||
lazy: true
|
||||
arguments:
|
||||
$pocketConsumer: '@old_sound_rabbit_mq.import_pocket_consumer'
|
||||
$readabilityConsumer: '@old_sound_rabbit_mq.import_readability_consumer'
|
||||
$wallabagV1Consumer: '@old_sound_rabbit_mq.import_wallabag_v1_consumer'
|
||||
$wallabagV2Consumer: '@old_sound_rabbit_mq.import_wallabag_v2_consumer'
|
||||
$firefoxConsumer: '@old_sound_rabbit_mq.import_firefox_consumer'
|
||||
$chromeConsumer: '@old_sound_rabbit_mq.import_chrome_consumer'
|
||||
$instapaperConsumer: '@old_sound_rabbit_mq.import_instapaper_consumer'
|
||||
$pinboardConsumer: '@old_sound_rabbit_mq.import_pinboard_consumer'
|
||||
$deliciousConsumer: '@old_sound_rabbit_mq.import_delicious_consumer'
|
||||
$elcuratorConsumer: '@old_sound_rabbit_mq.import_elcurator_consumer'
|
||||
|
||||
wallabag_import.consumer.amqp.pocket:
|
||||
class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer
|
||||
arguments:
|
||||
|
|
|
@ -116,6 +116,7 @@
|
|||
"symfony/dom-crawler": "^4.0",
|
||||
"symfony/mailer": "^4.0",
|
||||
"symfony/monolog-bundle": "^3.1",
|
||||
"symfony/proxy-manager-bridge": "^4.4",
|
||||
"symfony/symfony": "^4.0",
|
||||
"tecnickcom/tcpdf": "^6.3.0",
|
||||
"twig/extra-bundle": "^3.4",
|
||||
|
|
14
composer.lock
generated
14
composer.lock
generated
|
@ -4,7 +4,7 @@
|
|||
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
|
||||
"This file is @generated automatically"
|
||||
],
|
||||
"content-hash": "158ed59536ca89df79f9fdca18270c92",
|
||||
"content-hash": "5eb04015c33ddf2996bb1d1c97b71daf",
|
||||
"packages": [
|
||||
{
|
||||
"name": "babdev/pagerfanta-bundle",
|
||||
|
@ -7585,16 +7585,16 @@
|
|||
},
|
||||
{
|
||||
"name": "phpseclib/phpseclib",
|
||||
"version": "3.0.17",
|
||||
"version": "3.0.18",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/phpseclib/phpseclib.git",
|
||||
"reference": "dbc2307d5c69aeb22db136c52e91130d7f2ca761"
|
||||
"reference": "f28693d38ba21bb0d9f0c411ee5dae2b178201da"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/dbc2307d5c69aeb22db136c52e91130d7f2ca761",
|
||||
"reference": "dbc2307d5c69aeb22db136c52e91130d7f2ca761",
|
||||
"url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/f28693d38ba21bb0d9f0c411ee5dae2b178201da",
|
||||
"reference": "f28693d38ba21bb0d9f0c411ee5dae2b178201da",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
|
@ -7675,7 +7675,7 @@
|
|||
],
|
||||
"support": {
|
||||
"issues": "https://github.com/phpseclib/phpseclib/issues",
|
||||
"source": "https://github.com/phpseclib/phpseclib/tree/3.0.17"
|
||||
"source": "https://github.com/phpseclib/phpseclib/tree/3.0.18"
|
||||
},
|
||||
"funding": [
|
||||
{
|
||||
|
@ -7691,7 +7691,7 @@
|
|||
"type": "tidelift"
|
||||
}
|
||||
],
|
||||
"time": "2022-10-24T10:51:50+00:00"
|
||||
"time": "2022-12-17T18:26:50+00:00"
|
||||
},
|
||||
{
|
||||
"name": "phpstan/phpdoc-parser",
|
||||
|
|
|
@ -9,7 +9,7 @@ use Lexik\Bundle\FormFilterBundle\Filter\FilterBuilderUpdaterInterface;
|
|||
use Pagerfanta\Doctrine\ORM\QueryAdapter as DoctrineORMAdapter;
|
||||
use Pagerfanta\Exception\OutOfRangeCurrentPageException;
|
||||
use Sensio\Bundle\FrameworkExtraBundle\Configuration\Cache;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||
use Symfony\Component\HttpFoundation\RedirectResponse;
|
||||
use Symfony\Component\HttpFoundation\Request;
|
||||
|
@ -31,7 +31,7 @@ use Wallabag\CoreBundle\Helper\Redirect;
|
|||
use Wallabag\CoreBundle\Repository\EntryRepository;
|
||||
use Wallabag\CoreBundle\Repository\TagRepository;
|
||||
|
||||
class EntryController extends Controller
|
||||
class EntryController extends AbstractController
|
||||
{
|
||||
private EntityManagerInterface $entityManager;
|
||||
private EventDispatcherInterface $eventDispatcher;
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
namespace Wallabag\CoreBundle\Controller;
|
||||
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\HttpFoundation\Request;
|
||||
use Symfony\Component\HttpFoundation\Response;
|
||||
use Symfony\Component\HttpKernel\Exception\NotFoundHttpException;
|
||||
|
@ -16,7 +16,7 @@ use Wallabag\CoreBundle\Repository\TagRepository;
|
|||
* The try/catch can be removed once all formats will be implemented.
|
||||
* Still need implementation: txt.
|
||||
*/
|
||||
class ExportController extends Controller
|
||||
class ExportController extends AbstractController
|
||||
{
|
||||
/**
|
||||
* Gets one entry content.
|
||||
|
|
|
@ -7,7 +7,7 @@ use Pagerfanta\Doctrine\ORM\QueryAdapter as DoctrineORMAdapter;
|
|||
use Pagerfanta\Exception\OutOfRangeCurrentPageException;
|
||||
use Pagerfanta\Pagerfanta;
|
||||
use Sensio\Bundle\FrameworkExtraBundle\Configuration\ParamConverter;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\HttpFoundation\Request;
|
||||
use Symfony\Component\HttpFoundation\Response;
|
||||
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
|
||||
|
@ -18,7 +18,7 @@ use Wallabag\CoreBundle\Helper\PreparePagerForEntries;
|
|||
use Wallabag\CoreBundle\Repository\EntryRepository;
|
||||
use Wallabag\UserBundle\Entity\User;
|
||||
|
||||
class FeedController extends Controller
|
||||
class FeedController extends AbstractController
|
||||
{
|
||||
private EntryRepository $entryRepository;
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
namespace Wallabag\CoreBundle\Controller;
|
||||
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\Form\Form;
|
||||
use Symfony\Component\HttpFoundation\RedirectResponse;
|
||||
use Symfony\Component\HttpFoundation\Request;
|
||||
|
@ -19,7 +19,7 @@ use Wallabag\CoreBundle\Repository\IgnoreOriginInstanceRuleRepository;
|
|||
*
|
||||
* @Route("/ignore-origin-instance-rules")
|
||||
*/
|
||||
class IgnoreOriginInstanceRuleController extends Controller
|
||||
class IgnoreOriginInstanceRuleController extends AbstractController
|
||||
{
|
||||
private EntityManagerInterface $entityManager;
|
||||
private TranslatorInterface $translator;
|
||||
|
|
|
@ -4,7 +4,7 @@ namespace Wallabag\CoreBundle\Controller;
|
|||
|
||||
use Craue\ConfigBundle\Util\Config;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\Form\Form;
|
||||
use Symfony\Component\HttpFoundation\RedirectResponse;
|
||||
use Symfony\Component\HttpFoundation\Request;
|
||||
|
@ -22,7 +22,7 @@ use Wallabag\UserBundle\Entity\User;
|
|||
*
|
||||
* @Route("/site-credentials")
|
||||
*/
|
||||
class SiteCredentialController extends Controller
|
||||
class SiteCredentialController extends AbstractController
|
||||
{
|
||||
private EntityManagerInterface $entityManager;
|
||||
private TranslatorInterface $translator;
|
||||
|
|
|
@ -2,10 +2,10 @@
|
|||
|
||||
namespace Wallabag\CoreBundle\Controller;
|
||||
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\Routing\Annotation\Route;
|
||||
|
||||
class StaticController extends Controller
|
||||
class StaticController extends AbstractController
|
||||
{
|
||||
/**
|
||||
* @Route("/howto", name="howto")
|
||||
|
|
|
@ -7,7 +7,7 @@ use Doctrine\ORM\QueryBuilder;
|
|||
use Pagerfanta\Adapter\ArrayAdapter;
|
||||
use Pagerfanta\Exception\OutOfRangeCurrentPageException;
|
||||
use Sensio\Bundle\FrameworkExtraBundle\Configuration\ParamConverter;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\HttpFoundation\Request;
|
||||
use Symfony\Component\HttpFoundation\Response;
|
||||
use Symfony\Component\Routing\Annotation\Route;
|
||||
|
@ -21,7 +21,7 @@ use Wallabag\CoreBundle\Helper\TagsAssigner;
|
|||
use Wallabag\CoreBundle\Repository\EntryRepository;
|
||||
use Wallabag\CoreBundle\Repository\TagRepository;
|
||||
|
||||
class TagController extends Controller
|
||||
class TagController extends AbstractController
|
||||
{
|
||||
private EntityManagerInterface $entityManager;
|
||||
private TagsAssigner $tagsAssigner;
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
<?php
|
||||
|
||||
namespace Wallabag\ImportBundle\Consumer;
|
||||
|
||||
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
|
||||
|
||||
/**
|
||||
* A proxy class only used to count messages in a queue while lazy loading RabbitMQ services.
|
||||
* Only used in ImportController.
|
||||
*/
|
||||
class RabbitMQConsumerTotalProxy
|
||||
{
|
||||
private Consumer $pocketConsumer;
|
||||
private Consumer $readabilityConsumer;
|
||||
private Consumer $wallabagV1Consumer;
|
||||
private Consumer $wallabagV2Consumer;
|
||||
private Consumer $firefoxConsumer;
|
||||
private Consumer $chromeConsumer;
|
||||
private Consumer $instapaperConsumer;
|
||||
private Consumer $pinboardConsumer;
|
||||
private Consumer $deliciousConsumer;
|
||||
private Consumer $elcuratorConsumer;
|
||||
|
||||
public function __construct(Consumer $pocketConsumer, Consumer $readabilityConsumer, Consumer $wallabagV1Consumer, Consumer $wallabagV2Consumer, Consumer $firefoxConsumer, Consumer $chromeConsumer, Consumer $instapaperConsumer, Consumer $pinboardConsumer, Consumer $deliciousConsumer, Consumer $elcuratorConsumer)
|
||||
{
|
||||
$this->pocketConsumer = $pocketConsumer;
|
||||
$this->readabilityConsumer = $readabilityConsumer;
|
||||
$this->wallabagV1Consumer = $wallabagV1Consumer;
|
||||
$this->wallabagV2Consumer = $wallabagV2Consumer;
|
||||
$this->firefoxConsumer = $firefoxConsumer;
|
||||
$this->chromeConsumer = $chromeConsumer;
|
||||
$this->instapaperConsumer = $instapaperConsumer;
|
||||
$this->pinboardConsumer = $pinboardConsumer;
|
||||
$this->deliciousConsumer = $deliciousConsumer;
|
||||
$this->elcuratorConsumer = $elcuratorConsumer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Count message in RabbitMQ queue.
|
||||
*
|
||||
* It get one message without acking it (so it'll stay in the queue)
|
||||
* which will include the total of *other* messages in the queue.
|
||||
* Adding one to that messages will result in the full total message.
|
||||
*
|
||||
* @param string $importService The import service related: pocket, readability, wallabag_v1 or wallabag_v2
|
||||
*/
|
||||
public function getTotalMessage(string $importService): int
|
||||
{
|
||||
switch ($importService) {
|
||||
case 'pocket':
|
||||
$consumer = $this->pocketConsumer;
|
||||
break;
|
||||
case 'readability':
|
||||
$consumer = $this->readabilityConsumer;
|
||||
break;
|
||||
case 'wallabag_v1':
|
||||
$consumer = $this->wallabagV1Consumer;
|
||||
break;
|
||||
case 'wallabag_v2':
|
||||
$consumer = $this->wallabagV2Consumer;
|
||||
break;
|
||||
case 'firefox':
|
||||
$consumer = $this->firefoxConsumer;
|
||||
break;
|
||||
case 'chrome':
|
||||
$consumer = $this->chromeConsumer;
|
||||
break;
|
||||
case 'instapaper':
|
||||
$consumer = $this->instapaperConsumer;
|
||||
break;
|
||||
case 'pinboard':
|
||||
$consumer = $this->pinboardConsumer;
|
||||
break;
|
||||
case 'delicious':
|
||||
$consumer = $this->deliciousConsumer;
|
||||
break;
|
||||
case 'elcurator':
|
||||
$consumer = $this->elcuratorConsumer;
|
||||
break;
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
|
||||
$message = $consumer->getChannel()->basic_get('wallabag.import.' . $importService);
|
||||
|
||||
if (null === $message) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return $message->delivery_info['message_count'] + 1;
|
||||
}
|
||||
}
|
|
@ -7,17 +7,25 @@ use Predis\Client;
|
|||
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
|
||||
use Symfony\Component\Routing\Annotation\Route;
|
||||
use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface;
|
||||
use Wallabag\ImportBundle\Consumer\RabbitMQConsumerTotalProxy;
|
||||
use Wallabag\ImportBundle\Import\ImportChain;
|
||||
|
||||
class ImportController extends Controller
|
||||
{
|
||||
private RabbitMQConsumerTotalProxy $rabbitMQConsumerTotalProxy;
|
||||
|
||||
public function __construct(RabbitMQConsumerTotalProxy $rabbitMQConsumerTotalProxy)
|
||||
{
|
||||
$this->rabbitMQConsumerTotalProxy = $rabbitMQConsumerTotalProxy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @Route("/", name="import")
|
||||
*/
|
||||
public function importAction()
|
||||
public function importAction(ImportChain $importChain)
|
||||
{
|
||||
return $this->render('@WallabagImport/Import/index.html.twig', [
|
||||
'imports' => $this->get(ImportChain::class)->getAll(),
|
||||
'imports' => $importChain->getAll(),
|
||||
]);
|
||||
}
|
||||
|
||||
|
@ -25,35 +33,35 @@ class ImportController extends Controller
|
|||
* Display how many messages are queue (both in Redis and RabbitMQ).
|
||||
* Only for admins.
|
||||
*/
|
||||
public function checkQueueAction()
|
||||
public function checkQueueAction(AuthorizationCheckerInterface $authorizationChecker, Config $craueConfig)
|
||||
{
|
||||
$nbRedisMessages = null;
|
||||
$nbRabbitMessages = null;
|
||||
$redisNotInstalled = false;
|
||||
$rabbitNotInstalled = false;
|
||||
|
||||
if (!$this->get(AuthorizationCheckerInterface::class)->isGranted('ROLE_SUPER_ADMIN')) {
|
||||
if (!$authorizationChecker->isGranted('ROLE_SUPER_ADMIN')) {
|
||||
return $this->render('@WallabagImport/Import/check_queue.html.twig');
|
||||
}
|
||||
|
||||
if ($this->get(Config::class)->get('import_with_rabbitmq')) {
|
||||
if ($craueConfig->get('import_with_rabbitmq')) {
|
||||
// in case rabbit is activated but not installed
|
||||
try {
|
||||
$nbRabbitMessages = $this->getTotalMessageInRabbitQueue('pocket')
|
||||
+ $this->getTotalMessageInRabbitQueue('readability')
|
||||
+ $this->getTotalMessageInRabbitQueue('wallabag_v1')
|
||||
+ $this->getTotalMessageInRabbitQueue('wallabag_v2')
|
||||
+ $this->getTotalMessageInRabbitQueue('firefox')
|
||||
+ $this->getTotalMessageInRabbitQueue('chrome')
|
||||
+ $this->getTotalMessageInRabbitQueue('instapaper')
|
||||
+ $this->getTotalMessageInRabbitQueue('pinboard')
|
||||
+ $this->getTotalMessageInRabbitQueue('delicious')
|
||||
+ $this->getTotalMessageInRabbitQueue('elcurator')
|
||||
$nbRabbitMessages = $this->rabbitMQConsumerTotalProxy->getTotalMessage('pocket')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('readability')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('wallabag_v1')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('wallabag_v2')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('firefox')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('chrome')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('instapaper')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('pinboard')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('delicious')
|
||||
+ $this->rabbitMQConsumerTotalProxy->getTotalMessage('elcurator')
|
||||
;
|
||||
} catch (\Exception $e) {
|
||||
$rabbitNotInstalled = true;
|
||||
}
|
||||
} elseif ($this->get(Config::class)->get('import_with_redis')) {
|
||||
} elseif ($craueConfig->get('import_with_redis')) {
|
||||
$redis = $this->get(Client::class);
|
||||
|
||||
try {
|
||||
|
@ -80,28 +88,4 @@ class ImportController extends Controller
|
|||
'rabbitNotInstalled' => $rabbitNotInstalled,
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Count message in RabbitMQ queue.
|
||||
* It get one message without acking it (so it'll stay in the queue)
|
||||
* which will include the total of *other* messages in the queue.
|
||||
* Adding one to that messages will result in the full total message.
|
||||
*
|
||||
* @param string $importService The import service related: pocket, readability, wallabag_v1 or wallabag_v2
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
private function getTotalMessageInRabbitQueue($importService)
|
||||
{
|
||||
$message = $this
|
||||
->get('old_sound_rabbit_mq.import_' . $importService . '_consumer')
|
||||
->getChannel()
|
||||
->basic_get('wallabag.import.' . $importService);
|
||||
|
||||
if (null === $message) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return $message->delivery_info['message_count'] + 1;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue