在ThinkPHP项目中使用Kafka,可以通过集成`php-rdkafka`库来实现消息队列的高效处理和实时通信。以下是从安装到使用Kafka的详细步骤和示例代码:
### 1. 安装 Kafka PHP 客户端
在ThinkPHP项目中,推荐使用`php-rdkafka`库来与Kafka进行交互。可以通过Composer安装该库:
```bash
composer require php-rdkafka/php-rdkafka
```
### 2. 配置 Kafka
在ThinkPHP项目的配置文件中,可以添加Kafka的配置信息,例如:
```php
return [
'bootstrap_servers' => 'localhost:9092', // Kafka broker地址
'default_topic' => 'test_topic', // 默认主题
'default_group' => 'test_group', // 默认消费者组
'ack_level' => 'all', // 确认级别
'socket_timeout' => 10000, // 套接字超时
'session_timeout' => 10000, // 会话超时
'compression' => 'lz4', // 压缩类型
'retries' => 5, // 重试次数
'retry_backoff' => 1000, // 重试间隔
];
```
### 3. 生产者示例
创建一个生产者类,用于向Kafka主题发送消息:
```php
<?php
namespace app\common\lib;
use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\TopicConf;
class KafkaProducer
{
private $producer;
public function __construct($config)
{
$conf = new Conf();
$conf->set('metadata.broker.list', $config['bootstrap_servers']);
$conf->set('message.timeout.ms', $config['socket_timeout']);
$conf->set('compression.type', $config['compression']);
$conf->set('acks', $config['ack_level']);
$conf->set('retries', $config['retries']);
$conf->set('retry.backoff.ms', $config['retry_backoff']);
$this->producer = new Producer($conf);
}
public function sendMessage($topicName, $message, $messageKey = null)
{
$topicConf = new TopicConf();
$topicConf->set('request.required.acks', $this->config['ack_level']);
$topic = $this->producer->newTopic($topicName, $topicConf);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message, $messageKey);
$this->producer->poll(0);
$result = $this->producer->flush(2000);
return $result === RD_KAFKA_RESP_ERR_NO_ERROR;
}
}
```
### 4. 消费者示例
创建一个消费者类,用于从Kafka主题接收消息:
```php
<?php
namespace app\common\lib;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
class KafkaConsumer
{
private $consumer;
public function __construct($config)
{
$conf = new Conf();
$conf->set('group.id', $config['default_group']);
$conf->set('bootstrap.servers', $config['bootstrap_servers']);
$conf->set('auto.offset.reset', 'earliest');
$conf->set('session.timeout.ms', $config['session_timeout']);
$this->consumer = new KafkaConsumer($conf);
}
public function subscribe($topicName)
{
$this->consumer->subscribe([$topicName]);
}
public function consumeMessage($timeout = 12000)
{
$message = $this->consumer->consume($timeout);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
return $message->payload;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "分区已消费完毕,等待新消息\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "消费超时,检查网络或调整timeout参数\n";
break;
default:
echo "错误: " . $message->errstr() . "\n";
break;
}
return null;
}
public function close()
{
$this->consumer->unsubscribe();
$this->consumer->close();
}
}
```
### 5. 使用示例
在控制器中,可以使用生产者和消费者类来发送和接收消息:
```php
<?php
namespace app\index\controller;
use app\common\lib\KafkaProducer;
use app\common\lib\KafkaConsumer;
use think\facade\Config;
class KafkaController
{
public function send()
{
$config = Config::get('kafka');
$producer = new KafkaProducer($config);
$result = $producer->sendMessage($config['default_topic'], 'Hello, Kafka!');
if ($result) {
return '消息发送成功';
} else {
return '消息发送失败';
}
}
public function receive()
{
$config = Config::get('kafka');
$consumer = new KafkaConsumer($config);
$consumer->subscribe($config['default_topic']);
while (true) {
$message = $consumer->consumeMessage();
if ($message !== null) {
echo "收到消息: $message\n";
}
}
$consumer->close();
}
}
```
### 6. Kafka 的优势
- **高吞吐量**:支持大量的消息发布和订阅。
- **持久性**:消息会被持久化到磁盘,保证消息不丢失。
- **可靠性**:通过配置可以实现至少传递一次或恰好传递一次的消息传递语义。
- **扩展性**:通过分区和副本机制,可以实现水平扩展。
通过上述步骤,你可以在ThinkPHP项目中集成Kafka,实现高效的消息队列处理和实时通信。