顾乔芝士网

持续更新的前后端开发技术栈

thinkphp kafka 运用

在ThinkPHP项目中使用Kafka,可以通过集成`php-rdkafka`库来实现消息队列的高效处理和实时通信。以下是从安装到使用Kafka的详细步骤和示例代码:

### 1. 安装 Kafka PHP 客户端

在ThinkPHP项目中,推荐使用`php-rdkafka`库来与Kafka进行交互。可以通过Composer安装该库:

```bash

composer require php-rdkafka/php-rdkafka

```

### 2. 配置 Kafka

在ThinkPHP项目的配置文件中,可以添加Kafka的配置信息,例如:

```php

return [

'bootstrap_servers' => 'localhost:9092', // Kafka broker地址

'default_topic' => 'test_topic', // 默认主题

'default_group' => 'test_group', // 默认消费者组

'ack_level' => 'all', // 确认级别

'socket_timeout' => 10000, // 套接字超时

'session_timeout' => 10000, // 会话超时

'compression' => 'lz4', // 压缩类型

'retries' => 5, // 重试次数

'retry_backoff' => 1000, // 重试间隔

];

```

### 3. 生产者示例

创建一个生产者类,用于向Kafka主题发送消息:

```php

<?php

namespace app\common\lib;

use RdKafka\Conf;

use RdKafka\Producer;

use RdKafka\TopicConf;

class KafkaProducer

{

private $producer;

public function __construct($config)

{

$conf = new Conf();

$conf->set('metadata.broker.list', $config['bootstrap_servers']);

$conf->set('message.timeout.ms', $config['socket_timeout']);

$conf->set('compression.type', $config['compression']);

$conf->set('acks', $config['ack_level']);

$conf->set('retries', $config['retries']);

$conf->set('retry.backoff.ms', $config['retry_backoff']);

$this->producer = new Producer($conf);

}

public function sendMessage($topicName, $message, $messageKey = null)

{

$topicConf = new TopicConf();

$topicConf->set('request.required.acks', $this->config['ack_level']);

$topic = $this->producer->newTopic($topicName, $topicConf);

$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message, $messageKey);

$this->producer->poll(0);

$result = $this->producer->flush(2000);

return $result === RD_KAFKA_RESP_ERR_NO_ERROR;

}

}

```

### 4. 消费者示例

创建一个消费者类,用于从Kafka主题接收消息:

```php

<?php

namespace app\common\lib;

use RdKafka\Conf;

use RdKafka\KafkaConsumer;

use RdKafka\Message;

class KafkaConsumer

{

private $consumer;

public function __construct($config)

{

$conf = new Conf();

$conf->set('group.id', $config['default_group']);

$conf->set('bootstrap.servers', $config['bootstrap_servers']);

$conf->set('auto.offset.reset', 'earliest');

$conf->set('session.timeout.ms', $config['session_timeout']);

$this->consumer = new KafkaConsumer($conf);

}

public function subscribe($topicName)

{

$this->consumer->subscribe([$topicName]);

}

public function consumeMessage($timeout = 12000)

{

$message = $this->consumer->consume($timeout);

switch ($message->err) {

case RD_KAFKA_RESP_ERR_NO_ERROR:

return $message->payload;

case RD_KAFKA_RESP_ERR__PARTITION_EOF:

echo "分区已消费完毕,等待新消息\n";

break;

case RD_KAFKA_RESP_ERR__TIMED_OUT:

echo "消费超时,检查网络或调整timeout参数\n";

break;

default:

echo "错误: " . $message->errstr() . "\n";

break;

}

return null;

}

public function close()

{

$this->consumer->unsubscribe();

$this->consumer->close();

}

}

```

### 5. 使用示例

在控制器中,可以使用生产者和消费者类来发送和接收消息:

```php

<?php

namespace app\index\controller;

use app\common\lib\KafkaProducer;

use app\common\lib\KafkaConsumer;

use think\facade\Config;

class KafkaController

{

public function send()

{

$config = Config::get('kafka');

$producer = new KafkaProducer($config);

$result = $producer->sendMessage($config['default_topic'], 'Hello, Kafka!');

if ($result) {

return '消息发送成功';

} else {

return '消息发送失败';

}

}

public function receive()

{

$config = Config::get('kafka');

$consumer = new KafkaConsumer($config);

$consumer->subscribe($config['default_topic']);

while (true) {

$message = $consumer->consumeMessage();

if ($message !== null) {

echo "收到消息: $message\n";

}

}

$consumer->close();

}

}

```

### 6. Kafka 的优势

- **高吞吐量**:支持大量的消息发布和订阅。

- **持久性**:消息会被持久化到磁盘,保证消息不丢失。

- **可靠性**:通过配置可以实现至少传递一次或恰好传递一次的消息传递语义。

- **扩展性**:通过分区和副本机制,可以实现水平扩展。

通过上述步骤,你可以在ThinkPHP项目中集成Kafka,实现高效的消息队列处理和实时通信。

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言