Skip to content

Commit

Permalink
added source code
Browse files Browse the repository at this point in the history
  • Loading branch information
petr_bilek committed Jun 1, 2017
1 parent f01ecde commit 9834033
Show file tree
Hide file tree
Showing 17 changed files with 1,035 additions and 0 deletions.
52 changes: 52 additions & 0 deletions src/Command/ConsumerCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

namespace Sallyx\RabbitMqLogger\Command;

use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Kdyby\RabbitMq\Connection;
use Kdyby\RabbitMq\Channel;
use PhpAmqpLib\Message\AMQPMessage;
use Sallyx\RabbitMqLogger\Model\Manager;
use Sallyx\RabbitMqLogger\Model\Message;

class ConsumerCommand extends ListMessagesCommand {

/** @var Manager */
private $manager;

public function __construct(Connection $connection, $prefix, array $config, Manager $manager) {
parent::__construct($connection, $prefix, $config);
$this->manager = $manager;
}

protected function configure() {
$qn = $this->config['queue']['name'];
$this->setName($this->prefix . ':save')
->setDescription('List, save and consume messages from a queue.')
->setHelp("List message(s) from a queue. " .
"Save them using \Sallyx\RabbitMqLogger\Model\Manager.\n" .
"Listeted and saved messages are deleted from the queue.")
->addArgument('queue', InputArgument::OPTIONAL, 'Queue name. (e.g. ' . $qn . ')')
->addOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit of consumed messages. Default ' . self::DEFAULT_LIMIT . '. (0 means no limit.)')
->addOption('width', 'w', InputOption::VALUE_REQUIRED, 'Console width. (Default 80)');
}

/**
* @param OutputInterface $output
* @param Channel $channel
* @param AMQPMessage $msg
* @return void
*/
protected function outputMessage(OutputInterface $output, Channel $channel, AMQPMessage $msg) {
try {
$this->manager->save(new Message($msg));
} catch (\Exception $e) {
$output->writeLn('<error>' . $e->getMessage() . '</error>');
throw $e;
}
parent::outputMessage($output, $channel, $msg);
}

}
124 changes: 124 additions & 0 deletions src/Command/ListMessagesCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

namespace Sallyx\RabbitMqLogger\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Kdyby\RabbitMq\Connection;
use Kdyby\RabbitMq\Channel;
use PhpAmqpLib\Message\AMQPMessage;

class ListMessagesCommand extends Command {

const DEFAULT_LIMIT = 1;

/** @var Connection */
private $connection;

/** @var string */
protected $prefix;

/** @var array */
protected $config;

/** @var boolean */
private $isSilenced = FALSE;

/** @var int */
private $consoleWidth = 80;

/**
* @param Connection $connection
* @param string $prefix
* @param array $config
*/
public function __construct(Connection $connection, $prefix, array $config) {
$this->prefix = rtrim($prefix, '.');
$this->config = $config;
parent::__construct();
$this->connection = $connection;
}

/**
* @return void
*/
protected function configure() {
$qn = $this->config['queue']['name'];
$this->setName($this->prefix . ':list')
->setDescription('List and consume messages from a queue.')
->setHelp("List message(s) from a queue. Listeted messages are deleted from the queue.")
->addArgument('queue', InputArgument::REQUIRED, 'Queue name. (e.g. ' . $qn . ')')
->addOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit of consumed messages. Default ' . self::DEFAULT_LIMIT . '. (0 means no limit.)')
->addOption('width', 'w', InputOption::VALUE_REQUIRED, 'Console width. (Default 80)');
}

/**
* @param OutputInterface $output
* @param Channel $channel
* @param AMQPMessage $msg
* @return void
*/
protected function outputMessage(OutputInterface $output, Channel $channel, AMQPMessage $msg) {
$message = \json_decode($msg->body);
if (!$this->isSilenced) {
$len = $this->consoleWidth - 51;
$ms = sprintf(
"%9.9s|%14s|%-25.25s|<info>%.{$len}s</info>", $message->priority, \date('y/m/d H:i', $message->timestamp), $message->guid, $message->errorText
);
$output->writeLn($ms);
}
$channel->basic_ack($msg->delivery_info['delivery_tag']);
}

/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output) {
$limit = $input->getOption('limit');
if ($limit === NULL) {
$limit = self::DEFAULT_LIMIT;
}
$limit = \intval($limit);
$this->isSilenced = $input->getOption('quiet');
$cw = \intval($input->getOption('width'));
if ($cw > 0) {
$this->consoleWidth = $cw;
}
$queue = $input->getArgument('queue') ? : $this->config['queue']['name'];
$console = $this;
$channel = $this->connection->channel();
$callback = function($msg) use ($output, $console, $channel) {
$console->outputMessage($output, $channel, $msg);
};
try {
$c = $this->config;
if ($limit <= 0) {
$channel->basic_consume($queue, '', false, false, false, false, $callback);
while (\count($channel->callbacks)) {
$channel->wait();
}
} else {
$channel->basic_qos(null, 1, null);
do {
$msg = $channel->basic_get($queue, false);
if ($msg == NULL) {
break;
}
$this->outputMessage($output, $channel, $msg);
} while (--$limit > 0);
}
$channel->close();
$this->connection->close();
return 0;
} catch (\Exception $e) {
$output->writeLn('<error>' . $e->getMessage() . '</error>');
return 1;
}
}

}
74 changes: 74 additions & 0 deletions src/Command/QueueCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

namespace Sallyx\RabbitMqLogger\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Kdyby\RabbitMq\Connection;

class QueueCommand extends Command {

/** @var Connection */
private $connection;

/** @var string */
protected $prefix;

/** @var array */
protected $config;

/**
* @param Connection $connection
* @param string $prefix
* @param array $config
*/
public function __construct(Connection $connection, $prefix, array $config) {
$this->prefix = rtrim($prefix, '.');
$this->config = $config;
parent::__construct();
$this->connection = $connection;
}

/**
* @return void
*/
protected function configure() {
$qn = $this->config['queue']['name'];
$this->setName($this->prefix . ':queue')
->setDescription('Add or delete a queue.')
->setHelp("Create a queue and bind it to exchange or delete a queue.")
->addArgument('queue', InputArgument::REQUIRED, 'Queue name. (e.g. ' . $qn . ')')
->addArgument('bindingKey', InputArgument::OPTIONAL, 'Binding key.')
->addOption('delete', 'd', InputOption::VALUE_NONE, 'Delete queue instead of create.');
}

/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output) {
$delete = $input->getOption('delete');
$bindingKey = $input->getArgument('bindingKey') ? : '';
$queue = $input->getArgument('queue');
$exchangeName = $this->config['exchange']['name'];
try {
$channel = $this->connection->channel();
if (!$delete) {
$channel->queue_declare($queue, false, true, false, false);
$channel->queue_bind($queue, $exchangeName, $bindingKey, true);
$channel->close();
$this->connection->close();
return 0;
}
$channel->queue_delete($queue);
} catch (\Exception $e) {
$output->writeLn('<error>' . $e->getMessage() . '</error>');
return 1;
}
}

}
111 changes: 111 additions & 0 deletions src/DI/ConsumerExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php

namespace Sallyx\RabbitMqLogger\DI;

use Nette\DI\CompilerExtension;
use Kdyby\RabbitMq\DI\IConsumersProvider;

class ConsumerExtension extends CompilerExtension implements IConsumersProvider {

const DEFAULT_MANAGER = 'Sallyx\RabbitMqLogger\Model\Doctrine\Manager';
const DEFAULT_GRID_FACTORY = 'Sallyx\RabbitMqLogger\Controls\Doctrine\GridFactory';

/**
* @var array
*/
private $defaults = array(
'rabbitmqExtensionName' => 'rabbitmq',
'consumerName' => 'rabbitLogger',
'consumer' => array(
'connection' => 'default',
'contentType' => 'application/json',
'queue' => array(
'name' => 'nette-log-queue'
),
'exchange' => array(
'name' => 'nette-log-exchange',
'type' => 'fanout',
),
'callback' => array(
'@Sallyx\RabbitMqLogger\Model\ConsumerCallback',
'save'
)
),
'exceptionFileRoute' => array(
'route' => 'get-tracy-exception-file',
'secret' => null
),
'manager' => self::DEFAULT_MANAGER
);

/**
* @return array
*/
public function getRabbitConsumers() {
$config = $this->getConfig($this->defaults);
return [$config['consumerName'] => $config['consumer']];
}

/**
* @throws \Nette\Utils\AssertionException
* @return void
*/
public function loadConfiguration() {
$config = $this->getConfig($this->defaults);
if ($config['manager'] === self::DEFAULT_MANAGER) {
$ext = $this->compiler->getExtensions('Kdyby\Doctrine\DI\OrmExtension');
if (empty($ext)) {
throw new Nette\Utils\AssertionException('You should register \'Kdyby\Doctrine\DI\OrmExtension\' before \'' . get_class($this) . '\'.');
}
$orm = \array_pop($ext);
$ns = 'Sallyx\RabbitMqLogger\Model\Doctrine';
$dir = __DIR__ . '/../model/doctrine/entity/';
if (!empty($orm->config['metadata'])) {
$orm->config['metadata'][$ns] = $dir;
} else {
$orm->config['metadata'] = [$ns => $dir];
}
$builder = $this->getContainerBuilder();
$builder->addDefinition($this->prefix('manager'))
->setClass(self::DEFAULT_MANAGER);
$builder->addDefinition($this->prefix('consumerCallback'))
->setClass(trim($config['consumer']['callback'][0], '@'));

$gfClass = $builder->addDefinition($this->prefix('gridFactory'))
->setClass(self::DEFAULT_GRID_FACTORY, ['exConfig' => $config['exceptionFileRoute']]);
}
$this->loadConsole();
}

/**
* @return void
*/
private function loadConsole() {
if (!class_exists('Kdyby\Console\DI\ConsoleExtension') || PHP_SAPI !== 'cli') {
return;
}

$config = $this->getConfig($this->defaults);
$builder = $this->getContainerBuilder();
$connectionLink = '@' . $config['rabbitmqExtensionName'] . '.' . $config['consumer']['connection'] . '.connection';
$managerLink = '@' . $builder->getByType($config['manager']);
//ListMessagesCommand
$class1 = 'Sallyx\RabbitMqLogger\Command\ListMessagesCommand';
$builder->addDefinition($this->prefix('console.list'))
->setClass($class1, [$connectionLink, $this->prefix(''), $config['consumer'], $managerLink])
->addTag(\Kdyby\Console\DI\ConsoleExtension::COMMAND_TAG);
//QueueCommand
$class2 = 'Sallyx\RabbitMqLogger\Command\QueueCommand';
$builder->addDefinition($this->prefix('console.queue'))
->setClass($class2, [$connectionLink, $this->prefix(''), $config['consumer'], $managerLink])
->addTag(\Kdyby\Console\DI\ConsoleExtension::COMMAND_TAG);
//ConsumerCommand
if ($config['manager'] === self::DEFAULT_MANAGER) {
$class3 = 'Sallyx\RabbitMqLogger\Command\ConsumerCommand';
$builder->addDefinition($this->prefix('console.save'))
->setClass($class3, [$connectionLink, $this->prefix(''), $config['consumer'], $managerLink])
->addTag(\Kdyby\Console\DI\ConsoleExtension::COMMAND_TAG);
}
}

}

0 comments on commit 9834033

Please sign in to comment.