thinkphp6使用rabbitMq队列
thinkphp6 使用 rabbitMq
·
改编自 GitHub - sonhineboy/tp-queue: ThinkPHP 队列支持
thinkphp6 版本 v6.0.9
think-queue 版本 3.0
RabbitMq.php 类 放在 think-queue/src/queue/job 目录下
<?php
namespace think\queue\job;
use think\App;
use think\helper\Str;
use think\queue\Job;
use think\queue\connector\RabbitMq as RabbitMQQueue;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use Exception;
class RabbitMq extends Job
{
public const ATTEMPT_COUNT_HEADERS_KEY = 'attempts_count';
protected $connection;
protected $consumer;
protected $message;
public function __construct(App $app,RabbitMQQueue $connection, AmqpConsumer $consumer, AmqpMessage $message)
{
$this->connection = $connection;
$this->consumer = $consumer;
$this->message = $message;
$this->queue = $consumer->getQueue()->getQueueName();
$this->app = $app;
}
public function fire()
{
$payload = $this->payload();
list($class, $method) = $this->parseJob($payload['job']);
$this->instance = $this->resolve($class,'');
if ($this->instance) {
$this->instance->{$method}($this, $payload['data']);
}
}
public function parseJob($job)
{
$segments = explode('@', $job);
return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
}
public function attempts(): int
{
// set default job attempts to 1 so that jobs can run without retry
$defaultAttempts = 1;
return $this->message->getProperty(self::ATTEMPT_COUNT_HEADERS_KEY, $defaultAttempts);
}
public function getRawBody(): string
{
return $this->message->getBody();
}
public function delete()
{
parent::delete();
$this->consumer->acknowledge($this->message);
}
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$body = $this->payload();
/*
* Some jobs don't have the command set, so fall back to just sending it the job name string
*/
if (isset($body['data']['command']) === true) {
$job = $this->unserialize($body);
} else {
$job = $this->getName();
}
$data = $body['data'];
$this->connection->release($delay, $job, $data, $this->getQueue(), $this->attempts() + 1);
}
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload($name='',$default=''): array
{
return json_decode($this->getRawBody(), true);
}
/**
* Unserialize job.
*
* @param array $body
*
* @throws Exception
*
* @return mixed
*/
protected function unserialize(array $body)
{
try {
/* @noinspection UnserializeExploitsInspection */
return unserialize($body['data']['command']);
} catch (Exception $exception) {
if (
$this->causedByDeadlock($exception) ||
Str::contains($exception->getMessage(), ['detected deadlock'])
) {
sleep(2);
return $this->unserialize($body);
}
throw $exception;
}
}
/**
* Determine if the given exception was caused by a deadlock.
*
* @param \Exception $e
* @return bool
*/
protected function causedByDeadlock(Exception $e)
{
$message = $e->getMessage();
return Str::contains($message, [
'Deadlock found when trying to get lock',
'deadlock detected',
'The database file is locked',
'database is locked',
'database table is locked',
'A table in the database is locked',
'has been chosen as the deadlock victim',
'Lock wait timeout exceeded; try restarting transaction',
'WSREP detected deadlock/conflict and aborted the transaction. Try restarting the transaction',
]);
}
public function getJobId()
{
return $this->decoded['id'] ?? null;
}
}
创建类 RabbitMq.php (跟上面的类同名但不在同一个文件夹)放在 think-queue/src/queue/connector 目录下
<?php
namespace think\queue\connector;
use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpLib\AmqpContext;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;
use Interop\Amqp\Impl\AmqpBind;
use think\facade\Log;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\RabbitMq as RabbitMQJob;
class RabbitMq extends Connector
{
/* @var AmqpContext */
protected $context;
protected $options = [
'dsn' => '',
'host' => '127.0.0.1',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
'ssl_params' => [
'ssl_on' => false,
'cafile' => null,
'local_cert' => null,
'local_key' => null,
'verify_peer' => true,
'passphrase' => null,
],
];
protected $queueName;
protected $queueOptions;
protected $exchangeOptions;
protected $declaredExchanges = [];
protected $declaredQueues = [];
protected $sleepOnError;
public function __construct(array $options)
{
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$factory = new AmqpConnectionFactory([
'dsn' => $this->options['dsn'],
'host' => $this->options['host'],
'port' => $this->options['port'],
'user' => $this->options['login'],
'pass' => $this->options['password'],
'vhost' => $this->options['vhost'],
'ssl_on' => $this->options['ssl_params']['ssl_on'],
'ssl_verify' => $this->options['ssl_params']['verify_peer'],
'ssl_cacert' => $this->options['ssl_params']['cafile'],
'ssl_cert' => $this->options['ssl_params']['local_cert'],
'ssl_key' => $this->options['ssl_params']['local_key'],
'ssl_passphrase' => $this->options['ssl_params']['passphrase'],
]);
if ($factory instanceof DelayStrategyAware) {
$factory->setDelayStrategy(new RabbitMqDlxDelayStrategy());
}
$this->context = $factory->createContext();
$this->queueName = $this->options['queue'] ?? $this->options['options']['queue']['name'];
$this->queueOptions = $this->options['options']['queue'];
$this->queueOptions['arguments'] = isset($this->queueOptions['arguments']) ?
json_decode($this->queueOptions['arguments'], true) : [];
$this->exchangeOptions = $this->options['options']['exchange'];
$this->exchangeOptions['arguments'] = isset($this->exchangeOptions['arguments']) ?
json_decode($this->exchangeOptions['arguments'], true) : [];
$this->sleepOnError = $this->options['sleep_on_error'] ?? 5;
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data, $queue), $queue, []);
}
public function pushRaw($payload, $queueName = null, array $options = [])
{
try {
$this->options = array_merge($this->options,$options);
/**
* @var AmqpTopic
* @var AmqpQueue $queue
*/
[$queue, $topic] = $this->declareEverything($queueName);
$message = $this->context->createMessage($payload);
$message->setRoutingKey($queue->getQueueName());
$message->setCorrelationId(uniqid('', true));
$message->setContentType('application/json');
$message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
if (isset($this->options['headers'])) {
$message->setHeaders($this->options['headers']);
}
if (isset($this->options['properties'])) {
$message->setProperties($this->options['properties']);
}
if (isset($this->options['attempts'])) {
$message->setProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY, $this->options['attempts']);
}
$producer = $this->context->createProducer();
if (isset($options['delay']) && $options['delay'] > 0) {
$producer->setDeliveryDelay($options['delay'] * 1000);
}
$producer->send($topic, $message);
return $message->getCorrelationId();
} catch (\Exception $exception) {
$this->reportConnectionError('pushRaw', $exception);
return;
}
}
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job,$data, $queue), $queue, ['delay' => $delay]);
}
public function pop($queueName = null)
{
try {
/** @var AmqpQueue $queue */
[$queue] = $this->declareEverything($queueName);
$consumer = $this->context->createConsumer($queue);
if ($message = $consumer->receiveNoWait()) {
return new RabbitMQJob(app(),$this, $consumer, $message);
}
} catch (\Throwable $exception) {
$this->reportConnectionError('pop', $exception);
return;
}
}
/**
* Release a reserved job back onto the queue.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string|object $job
* @param mixed $data
* @param string $queue
* @param int $attempts
* @return mixed
*/
public function release($delay, $job, $data, $queue, $attempts = 0)
{
return $this->pushRaw($this->createPayload($job, $data, $queue), $queue, [
'delay' => $delay,
'attempts' => $attempts,
]);
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param string $queue
* @param mixed $data
* @return string
*/
protected function createPayload($job, $data = '', $queue = null)
{
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);
return $this->setMeta($payload, 'attempts', 1);
}
/**
* 随机id
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* @param string $queueName
*
* @return array [Interop\Amqp\AmqpQueue, Interop\Amqp\AmqpTopic]
*/
protected function declareEverything(string $queueName = null)
{
$queueName = $this->getQueueName($queueName);
$exchangeName = $this->exchangeOptions['name'] ?: $queueName;
$topic = $this->context->createTopic($exchangeName);
$topic->setType($this->exchangeOptions['type']);
$topic->setArguments($this->exchangeOptions['arguments']);
if ($this->exchangeOptions['passive']) {
$topic->addFlag(AmqpTopic::FLAG_PASSIVE);
}
if ($this->exchangeOptions['durable']) {
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
}
if ($this->exchangeOptions['auto_delete']) {
$topic->addFlag(AmqpTopic::FLAG_AUTODELETE);
}
if ($this->exchangeOptions['declare'] && ! in_array($exchangeName, $this->declaredExchanges, true)) {
$this->context->declareTopic($topic);
$this->declaredExchanges[] = $exchangeName;
}
$queue = $this->context->createQueue($queueName);
$queue->setArguments($this->queueOptions['arguments']);
if ($this->queueOptions['passive']) {
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
}
if ($this->queueOptions['durable']) {
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
}
if ($this->queueOptions['exclusive']) {
$queue->addFlag(AmqpQueue::FLAG_EXCLUSIVE);
}
if ($this->queueOptions['auto_delete']) {
$queue->addFlag(AmqpQueue::FLAG_AUTODELETE);
}
if ($this->queueOptions['declare'] && ! in_array($queueName, $this->declaredQueues, true)) {
$this->context->declareQueue($queue);
$this->declaredQueues[] = $queueName;
}
if ($this->queueOptions['bind']) {
$this->context->bind(new AmqpBind($queue, $topic, $queue->getQueueName()));
}
return [$queue, $topic];
}
protected function getQueueName($queueName = null)
{
return $queueName ?: $this->queueName;
}
/**
* @param string $action
* @param \Throwable $e
* @throws \Exception
*/
protected function reportConnectionError($action, \Throwable $e)
{
Log::error('AMQP error while attempting '.$action.': '.$e->getMessage());
if ($this->sleepOnError === false) {
throw new \RuntimeException('Error writing data to the connection with RabbitMQ', null, $e);
}
sleep($this->sleepOnError);
}
public function size($queue = null)
{
return 0;
}
}
两个类放完结构

config/queue.php 文件增加
'rabbiMQ' => [
'type' => 'rabbitMq',
'dsn' => env('RABBITMQ_DSN', null),
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'vhost' => env('RABBITMQ_VHOST', '/'),
'login' => env('RABBITMQ_LOGIN', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'queue' => env('RABBITMQ_QUEUE', 'default'),
'options' => [
'exchange' => [
'name' => env('RABBITMQ_EXCHANGE_NAME',"default_exchange"),
/*
* 确定如果exchange不存在,是否应创建exchange。
*/
'declare' => env('RABBITMQ_EXCHANGE_DECLARE', true),
/*
* 阅读更多信息 https://www.rabbitmq.com/tutorials/amqp-concepts.html
*/
'type' => env('RABBITMQ_EXCHANGE_TYPE', \Interop\Amqp\AmqpTopic::TYPE_DIRECT),
'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false),
'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true),
'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
'arguments' => env('RABBITMQ_EXCHANGE_ARGUMENTS'),
],
'queue' => [
/*
* 确定如果队列不存在,是否应创建该队列
*/
'declare' => env('RABBITMQ_QUEUE_DECLARE', true),
/*
* 确定是否应将队列绑定到创建的exchange。
*/
'bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true),
/*
* 阅读更多信息 https://www.rabbitmq.com/tutorials/amqp-concepts.html
*/
'passive' => env('RABBITMQ_QUEUE_PASSIVE', false),
'durable' => env('RABBITMQ_QUEUE_DURABLE', true),
'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false),
'arguments' => env('RABBITMQ_QUEUE_ARGUMENTS'),
],
],
]
config/queue.php default 值切换成 rabbiMQ 即可使用。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)