Public Network Access Configuration
Enable Cluster Authentication Configuration
Cluster Authentication Configuration
View SASL Access Point
[SASL_PLAINTEXT Protocol Address](/docs/ukafka/guide/node/address#SASL_PLAINTEXT Protocol Address)
Configure Public Network Forwarding
1. Create a Cloud Server in the Same Subnet as the UKafka Cluster
Note: This document is based on CentOS 7.9
2. Install nginx on the Cloud Server
yum install nginx nginx-all-modules.noarch3. Configure nginx Proxy
- Edit
/etc/nginx/nginx.confto add stream configuration
stream {
log_format proxy '$remote_addr [$time_local] '
'$protocol $status $bytes_sent $bytes_received '
'$session_time "$upstream_addr" '
'"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';
access_log /var/log/nginx/tcp-access.log proxy;
open_log_file_cache off;
# Unified placement, easy to manage
include /etc/nginx/tcpConf.d/*.conf;
}- Create
/etc/nginx/tcpConf.d/directory
mkdir -p /etc/nginx/tcpConf.d/- Edit
/etc/nginx/tcpConf.d/kafka.confconfiguration file
upstream tcp9093 {
server 10.23.26.105:9093;
}
upstream tcp9094 {
server 10.23.180.35:9094;
}
upstream tcp9095 {
server 10.23.202.164:9095;
}
server {
listen 9093;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9093;
}
server {
listen 9094;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9094;
}
server {
listen 9095;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9095;
}- Start nginx
systemctl start nginx.service- Verify the status of nginx service
systemctl status nginx.service4. Host Firewall Configuration
The host firewall needs to accept forwarding requests on ports 9093, 9094, and 9095.
Public Network Access
1. Configure Local Hosts
Modify the local /etc/hosts file, and direct all addresses listened by UKafka SASL_PLAINTEXT protocol to the public IP of the cloud server that transmits requests:
113.31.114.125 ukafka-q0j01x5g-kafka1
113.31.114.125 ukafka-q0j01x5g-kafka2
113.31.114.125 ukafka-q0j01x5g-kafka32. Access with Kafka Command Line Tool
Download and Configure Kafka Command Line Tool
Taking Kafka 2.6.1 command line tool as an example:
You can download the command line tool according to the instance version
# Download the command line tool
wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz
# Extract
tar -zxvf kafka_2.13-2.6.1.tgz
# Enter the command line tool root directory
cd kafka_2.13-2.6.1Configure the config/kafka_client_jaas.conf file as follows, where username and password are the username and password used when enabling cluster authentication configuration in the control panel:
cat > config/kafka_client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin_pass";
};
EOFSend Message
- Modify
bin/kafka-console-producer.shto addjava.security.auth.login.configparameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"- Modify
config/producer.properties, specifysecurity.protocol,sasl.mechanism
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN- Run
bin/kafka-console-producer.shto send message:
bin/kafka-console-producer.sh --producer.config ./config/producer.properties --topic foo --broker-list ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095
>hello
>world
>foo
>bar
>Receive Message
- Modify
bin/kafka-console-consumer.shto addjava.security.auth.login.configparameter
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"- Modify
config/consumer.properties, specifysecurity.protocol,sasl.mechanism
group.id=test-consumer-group
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN- Run
bin/kafka-console-consumer.shto receive message
bin/kafka-console-consumer.sh --consumer.config ./config/consumer.properties --topic foo --bootstrap-server ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095 --from-beginning
world
hello
foo
bar3. Access with Java SDK
There are several ways to access Java SDK, such as configuring the kafka_client_jaas.conf file or directly setting the sasl.jaas.config property. For details, please refer to the three ways in the following code examples.
If you use the kafka_client_jaas.conf configuration file, fill in the username and password of the username and password when you enable the cluster authentication configuration in the console:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin_pass";
};Producer Example
package cn.ucloud.ukafka.Example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception {
// Method 1: Add environment variable that requires specifying the path of the configuration file
System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
// Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
Properties props = new Properties();
props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
// You need to specify security.protocol and sasl.mechanism
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// Method 3: Set jaas configuration content with Properties
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = "foo";
String value = "this is the message's value";
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value);
try {
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
System.out.println(String.format("Produce ok: topic:%s partition:%d offset:%d value:%s",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), value));
} catch (Exception e) {
e.printStackTrace();
}
}
}Consumer Example
package cn.ucloud.ukafka.Example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// Method 1: Add environment variable that requires specifying the path of the configuration file
System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
// Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
Properties props = new Properties();
props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
props.put("group.id", "test_group");
props.put("auto.offset.reset", "earliest");
// You need to specify security.protocol and sasl.mechanism
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// Method 3: Set jaas configuration content with Properties
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add("foo");
consumer.subscribe(subscribedTopics);
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume topic:%s partition:%d offset:%d value:%s",
record.topic(), record.partition(), record.offset(), record.value()));
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}The support and setting for authentication by different language Kafka SDKs are different. For specific settings, you need to check the documentation of the relevant SDK.