springboot整合kafka
官方资料:https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/boot-features-messaging.html#boot-features-kafka
1. 创建工程**spring-boot-kafka,pom.xml文件依赖如下**
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zsx</groupId>
<artifactId>spring-boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-kafka</name>
<description>Spring Boot Kafka Project</description>
<parent>
<groupId>com.zsx</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../springboot-dependencies/pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- junit5运行所需jar包 -->
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.zsx.KafkaApplication</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
2. application.yml文件配置
spring:
kafka:
bootstrap-servers: 172.20.202.22:9092
consumer:
group-id: myGroup
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3. 生产者
package com.zsx.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaSender {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaReceiver.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//发送消息方法
public void send() {
String message = "Hello world";
LOGGER.info("KafkaSender.send", message);
kafkaTemplate.send("someTopic", message);
}
}
4. 消费者
package com.zsx.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaReceiver {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaReceiver.class);
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
LOGGER.info("KafkaReceiver.processMessage: " + content);
}
}
5. 引导类
package com.zsx;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
6. 测试类
package com.zsx.test;
import com.zsx.service.KafkaSender;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@SpringBootTest
@ExtendWith(SpringExtension.class)
public class KafkaSenderTest {
@Autowired
private KafkaSender kafkaSender;
@Test
void testSend() {
kafkaSender.send();
}
}
7. 运行测试方法,查看测试结果
C:\software\jdk-11.0.3\bin\java.exe -ea -Didea.test.cyclic.buffer.size=1048576 "-javaagent:C:\software\JetBrains\IntelliJ IDEA 2019.1.3\lib\idea_rt.jar=50257:C:\software\JetBrains\IntelliJ IDEA 2019.1.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\software\JetBrains\IntelliJ IDEA 2019.1.3\lib\idea_rt.jar;C:\software\JetBrains\IntelliJ IDEA 2019.1.3\plugins\junit\lib\junit-rt.jar;C:\software\JetBrains\IntelliJ IDEA 2019.1.3\plugins\junit\lib\junit5-rt.jar;D:\repository\org\junit\vintage\junit-vintage-engine\5.3.2\junit-vintage-engine-5.3.2.jar;D:\repository\org\apiguardian\apiguardian-api\1.0.0\apiguardian-api-1.0.0.jar;D:\repository\org\junit\platform\junit-platform-engine\1.3.2\junit-platform-engine-1.3.2.jar;D:\repository\org\junit\platform\junit-platform-commons\1.3.2\junit-platform-commons-1.3.2.jar;D:\repository\org\opentest4j\opentest4j\1.1.1\opentest4j-1.1.1.jar;D:\repository\junit\junit\4.12\junit-4.12.jar;D:\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;F:\IdeaProjects\springboot\springboot-kafka\target\test-classes;F:\IdeaProjects\springboot\springboot-kafka\target\classes;D:\repository\org\springframework\kafka\spring-kafka\2.2.8.RELEASE\spring-kafka-2.2.8.RELEASE.jar;D:\repository\org\springframework\spring-context\5.1.9.RELEASE\spring-context-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-aop\5.1.9.RELEASE\spring-aop-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-beans\5.1.9.RELEASE\spring-beans-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-expression\5.1.9.RELEASE\spring-expression-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-messaging\5.1.9.RELEASE\spring-messaging-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-tx\5.1.9.RELEASE\spring-tx-5.1.9.RELEASE.jar;D:\repository\org\springframework\retry\spring-retry\1.2.4.RELEASE\spring-retry-1.2.4.RELEASE.jar;D:\repository\org\apache\kafka\kafka-clients\2.0.1\kafka-clients-2.0.1.jar;D:\repository\org\lz4\lz4-java\1.4.1\lz4-java-1.4.1.jar;D:\repository\org\xerial\snappy\snappy-java\1.1.7.1\snappy-java-1.1.7.1.jar;D:\repository\org\slf4j\slf4j-api\1.7.26\slf4j-api-1.7.26.jar;D:\repository\org\springframework\kafka\spring-kafka-test\2.2.8.RELEASE\spring-kafka-test-2.2.8.RELEASE.jar;D:\repository\org\springframework\spring-test\5.1.9.RELEASE\spring-test-5.1.9.RELEASE.jar;D:\repository\org\apache\kafka\kafka-clients\2.0.1\kafka-clients-2.0.1-test.jar;D:\repository\com\fasterxml\jackson\core\jackson-databind\2.9.9\jackson-databind-2.9.9.jar;D:\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;D:\repository\com\fasterxml\jackson\core\jackson-core\2.9.9\jackson-core-2.9.9.jar;D:\repository\net\sf\jopt-simple\jopt-simple\5.0.4\jopt-simple-5.0.4.jar;D:\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;D:\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;D:\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;D:\repository\com\typesafe\scala-logging\scala-logging_2.11\3.9.0\scala-logging_2.11-3.9.0.jar;D:\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;D:\repository\org\apache\zookeeper\zookeeper\3.4.13\zookeeper-3.4.13.jar;D:\repository\org\apache\yetus\audience-annotations\0.5.0\audience-annotations-0.5.0.jar;D:\repository\org\apache\kafka\kafka_2.11\2.0.1\kafka_2.11-2.0.1-test.jar;D:\repository\org\junit\platform\junit-platform-launcher\1.3.2\junit-platform-launcher-1.3.2.jar;D:\repository\org\junit\jupiter\junit-jupiter-engine\5.3.2\junit-jupiter-engine-5.3.2.jar;D:\repository\org\junit\jupiter\junit-jupiter-api\5.3.2\junit-jupiter-api-5.3.2.jar;D:\repository\org\springframework\boot\spring-boot-starter-web\2.1.7.RELEASE\spring-boot-starter-web-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter\2.1.7.RELEASE\spring-boot-starter-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot\2.1.7.RELEASE\spring-boot-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-autoconfigure\2.1.7.RELEASE\spring-boot-autoconfigure-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-logging\2.1.7.RELEASE\spring-boot-starter-logging-2.1.7.RELEASE.jar;D:\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\repository\org\apache\logging\log4j\log4j-to-slf4j\2.11.2\log4j-to-slf4j-2.11.2.jar;D:\repository\org\apache\logging\log4j\log4j-api\2.11.2\log4j-api-2.11.2.jar;D:\repository\org\slf4j\jul-to-slf4j\1.7.26\jul-to-slf4j-1.7.26.jar;D:\repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;D:\repository\org\yaml\snakeyaml\1.23\snakeyaml-1.23.jar;D:\repository\org\springframework\boot\spring-boot-starter-json\2.1.7.RELEASE\spring-boot-starter-json-2.1.7.RELEASE.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.9\jackson-datatype-jdk8-2.9.9.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.9\jackson-datatype-jsr310-2.9.9.jar;D:\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.9\jackson-module-parameter-names-2.9.9.jar;D:\repository\org\springframework\boot\spring-boot-starter-tomcat\2.1.7.RELEASE\spring-boot-starter-tomcat-2.1.7.RELEASE.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.22\tomcat-embed-core-9.0.22.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.22\tomcat-embed-el-9.0.22.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.22\tomcat-embed-websocket-9.0.22.jar;D:\repository\org\hibernate\validator\hibernate-validator\6.0.17.Final\hibernate-validator-6.0.17.Final.jar;D:\repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;D:\repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;D:\repository\com\fasterxml\classmate\1.4.0\classmate-1.4.0.jar;D:\repository\org\springframework\spring-web\5.1.9.RELEASE\spring-web-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-webmvc\5.1.9.RELEASE\spring-webmvc-5.1.9.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-test\2.1.7.RELEASE\spring-boot-starter-test-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-test\2.1.7.RELEASE\spring-boot-test-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-test-autoconfigure\2.1.7.RELEASE\spring-boot-test-autoconfigure-2.1.7.RELEASE.jar;D:\repository\com\jayway\jsonpath\json-path\2.4.0\json-path-2.4.0.jar;D:\repository\net\minidev\json-smart\2.3\json-smart-2.3.jar;D:\repository\net\minidev\accessors-smart\1.2\accessors-smart-1.2.jar;D:\repository\org\ow2\asm\asm\5.0.4\asm-5.0.4.jar;D:\repository\org\assertj\assertj-core\3.11.1\assertj-core-3.11.1.jar;D:\repository\org\mockito\mockito-core\2.23.4\mockito-core-2.23.4.jar;D:\repository\net\bytebuddy\byte-buddy\1.9.16\byte-buddy-1.9.16.jar;D:\repository\net\bytebuddy\byte-buddy-agent\1.9.16\byte-buddy-agent-1.9.16.jar;D:\repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;D:\repository\org\hamcrest\hamcrest-library\1.3\hamcrest-library-1.3.jar;D:\repository\org\skyscreamer\jsonassert\1.5.0\jsonassert-1.5.0.jar;D:\repository\com\vaadin\external\google\android-json\0.0.20131108.vaadin1\android-json-0.0.20131108.vaadin1.jar;D:\repository\org\springframework\spring-core\5.1.9.RELEASE\spring-core-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-jcl\5.1.9.RELEASE\spring-jcl-5.1.9.RELEASE.jar;D:\repository\org\xmlunit\xmlunit-core\2.6.3\xmlunit-core-2.6.3.jar;D:\repository\org\glassfish\jaxb\jaxb-core\2.3.0.1\jaxb-core-2.3.0.1.jar;D:\repository\javax\xml\bind\jaxb-api\2.3.1\jaxb-api-2.3.1.jar;D:\repository\javax\activation\javax.activation-api\1.2.0\javax.activation-api-1.2.0.jar;D:\repository\org\glassfish\jaxb\txw2\2.3.1\txw2-2.3.1.jar;D:\repository\com\sun\istack\istack-commons-runtime\3.0.5\istack-commons-runtime-3.0.5.jar;D:\repository\com\sun\xml\bind\jaxb-impl\2.3.2\jaxb-impl-2.3.2.jar" com.intellij.rt.execution.junit.JUnitStarter -ideVersion5 -junit5 com.zsx.test.KafkaSenderTest,testSend
09:49:01.936 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating CacheAwareContextLoaderDelegate from class [org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate]
09:49:01.949 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating BootstrapContext using constructor [public org.springframework.test.context.support.DefaultBootstrapContext(java.lang.Class,org.springframework.test.context.CacheAwareContextLoaderDelegate)]
09:49:01.962 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating TestContextBootstrapper for test class [com.zsx.test.KafkaSenderTest] from class [org.springframework.boot.test.context.SpringBootTestContextBootstrapper]
09:49:01.970 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Neither @ContextConfiguration nor @ContextHierarchy found for test class [com.zsx.test.KafkaSenderTest], using SpringBootContextLoader
09:49:01.972 [main] DEBUG org.springframework.test.context.support.AbstractContextLoader - Did not detect default resource location for test class [com.zsx.test.KafkaSenderTest]: class path resource [com/zsx/test/KafkaSenderTest-context.xml] does not exist
09:49:01.973 [main] DEBUG org.springframework.test.context.support.AbstractContextLoader - Did not detect default resource location for test class [com.zsx.test.KafkaSenderTest]: class path resource [com/zsx/test/KafkaSenderTestContext.groovy] does not exist
09:49:01.973 [main] INFO org.springframework.test.context.support.AbstractContextLoader - Could not detect default resource locations for test class [com.zsx.test.KafkaSenderTest]: no resource found for suffixes {-context.xml, Context.groovy}.
09:49:01.974 [main] INFO org.springframework.test.context.support.AnnotationConfigContextLoaderUtils - Could not detect default configuration classes for test class [com.zsx.test.KafkaSenderTest]: KafkaSenderTest does not declare any static, non-private, non-final, nested classes annotated with @Configuration.
09:49:01.999 [main] DEBUG org.springframework.test.context.support.ActiveProfilesUtils - Could not find an 'annotation declaring class' for annotation type [org.springframework.test.context.ActiveProfiles] and class [com.zsx.test.KafkaSenderTest]
09:49:02.044 [main] DEBUG org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider - Identified candidate component class: file [F:\IdeaProjects\springboot\springboot-kafka\target\classes\com\zsx\KafkaApplication.class]
09:49:02.045 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Found @SpringBootConfiguration com.zsx.KafkaApplication for test class com.zsx.test.KafkaSenderTest
09:49:02.101 [main] DEBUG org.springframework.boot.test.context.SpringBootTestContextBootstrapper - @TestExecutionListeners is not present for class [com.zsx.test.KafkaSenderTest]: using defaults.
09:49:02.101 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Loaded default TestExecutionListener class names from location [META-INF/spring.factories]: [org.springframework.test.context.web.ServletTestExecutionListener, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener, org.springframework.test.context.support.DependencyInjectionTestExecutionListener, org.springframework.test.context.support.DirtiesContextTestExecutionListener, org.springframework.test.context.transaction.TransactionalTestExecutionListener, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener, org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener, org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener, org.springframework.boot.test.autoconfigure.restdocs.RestDocsTestExecutionListener, org.springframework.boot.test.autoconfigure.web.client.MockRestServiceServerResetTestExecutionListener, org.springframework.boot.test.autoconfigure.web.servlet.MockMvcPrintOnlyOnFailureTestExecutionListener, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverTestExecutionListener]
09:49:02.113 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Using TestExecutionListeners: [org.springframework.test.context.web.ServletTestExecutionListener@3e7dd664, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener@5b1ebf56, org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener@294a6b8e, org.springframework.boot.test.autoconfigure.SpringBootDependencyInjectionTestExecutionListener@4b1d6571, org.springframework.test.context.support.DirtiesContextTestExecutionListener@1b835480, org.springframework.test.context.transaction.TransactionalTestExecutionListener@3549bca9, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener@4f25b795, org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener@6fb365ed, org.springframework.boot.test.autoconfigure.restdocs.RestDocsTestExecutionListener@6e950bcf, org.springframework.boot.test.autoconfigure.web.client.MockRestServiceServerResetTestExecutionListener@16414e40, org.springframework.boot.test.autoconfigure.web.servlet.MockMvcPrintOnlyOnFailureTestExecutionListener@74bada02, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverTestExecutionListener@525575]
09:49:02.115 [main] DEBUG org.springframework.test.context.support.AbstractDirtiesContextTestExecutionListener - Before test class: context [DefaultTestContext@517d4a0d testClass = KafkaSenderTest, testInstance = [null], testMethod = [null], testException = [null], mergedContextConfiguration = [WebMergedContextConfiguration@7862f56 testClass = KafkaSenderTest, locations = '{}', classes = '{class com.zsx.KafkaApplication}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true}', contextCustomizers = set[org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@268f106e, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@6c284af, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@609e8838, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@6c4906d3], resourceBasePath = 'src/main/webapp', contextLoader = 'org.springframework.boot.test.context.SpringBootContextLoader', parent = [null]], attributes = map['org.springframework.test.context.web.ServletTestExecutionListener.activateListener' -> true]], class annotated with @DirtiesContext [false] with mode [null].
09:49:02.134 [main] DEBUG org.springframework.test.context.support.TestPropertySourceUtils - Adding inlined properties to environment: {spring.jmx.enabled=false, org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true, server.port=-1}
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.7.RELEASE)
2019-09-09 09:49:02.515 INFO 12840 --- [ main] com.zsx.test.KafkaSenderTest : Starting KafkaSenderTest on zsx with PID 12840 (started by zhang in F:\IdeaProjects\springboot\springboot-kafka)
2019-09-09 09:49:02.517 INFO 12840 --- [ main] com.zsx.test.KafkaSenderTest : No active profile set, falling back to default profiles: default
2019-09-09 09:49:03.236 INFO 12840 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$6993af06] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-09-09 09:49:03.693 INFO 12840 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-09-09 09:49:03.956 INFO 12840 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [172.20.202.22:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = myGroup
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2019-09-09 09:49:04.066 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2019-09-09 09:49:04.066 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2019-09-09 09:49:04.139 INFO 12840 --- [ main] org.apache.kafka.clients.Metadata : Cluster ID: wSgEsr98SyiHVZtHj0dNmQ
2019-09-09 09:49:04.148 INFO 12840 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [172.20.202.22:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = myGroup
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2019-09-09 09:49:04.152 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2019-09-09 09:49:04.153 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2019-09-09 09:49:04.154 INFO 12840 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2019-09-09 09:49:04.166 INFO 12840 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : Cluster ID: wSgEsr98SyiHVZtHj0dNmQ
2019-09-09 09:49:04.169 INFO 12840 --- [ main] com.zsx.test.KafkaSenderTest : Started KafkaSenderTest in 2.026 seconds (JVM running for 2.72)
2019-09-09 09:49:04.183 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Discovered group coordinator 172.20.202.22:9092 (id: 2147482646 rack: null)
2019-09-09 09:49:04.185 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Revoking previously assigned partitions []
2019-09-09 09:49:04.186 INFO 12840 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
2019-09-09 09:49:04.186 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] (Re-)joining group
2019-09-09 09:49:04.198 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Successfully joined group with generation 1
2019-09-09 09:49:04.200 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Setting newly assigned partitions [someTopic-0]
2019-09-09 09:49:04.219 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=myGroup] Resetting offset for partition someTopic-0 to offset 0.
2019-09-09 09:49:04.220 INFO 12840 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [someTopic-0]
2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
2019-09-09 09:49:04.388 INFO 12840 --- [ main] com.zsx.service.KafkaReceiver : KafkaSender.send
2019-09-09 09:49:04.390 INFO 12840 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [172.20.202.22:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2019-09-09 09:49:04.401 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2019-09-09 09:49:04.401 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2019-09-09 09:49:04.405 INFO 12840 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: wSgEsr98SyiHVZtHj0dNmQ
2019-09-09 09:49:04.414 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
2019-09-09 09:49:04.419 INFO 12840 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService
2019-09-09 09:49:04.424 INFO 12840 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
2019-09-09 09:49:04.425 INFO 12840 --- [ Thread-1] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2019-09-09 09:49:04.425 INFO 12840 --- [ Thread-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
Process finished with exit code 0
从结果可以看出配置成功
还没有评论,来说两句吧...