springboot整合kafka

﹏ヽ暗。殇╰゛Y 2024-04-18 11:47 75阅读 0赞

官方资料:https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/boot-features-messaging.html#boot-features-kafka

1. 创建工程**spring-boot-kafka,pom.xml文件依赖如下**

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.zsx</groupId>
  6. <artifactId>spring-boot-kafka</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>spring-boot-kafka</name>
  10. <description>Spring Boot Kafka Project</description>
  11. <parent>
  12. <groupId>com.zsx</groupId>
  13. <artifactId>spring-boot-dependencies</artifactId>
  14. <version>0.0.1-SNAPSHOT</version>
  15. <relativePath>../springboot-dependencies/pom.xml</relativePath>
  16. </parent>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.springframework.kafka</groupId>
  20. <artifactId>spring-kafka</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.kafka</groupId>
  24. <artifactId>spring-kafka-test</artifactId>
  25. <scope>test</scope>
  26. </dependency>
  27. <!-- junit5运行所需jar包 -->
  28. <dependency>
  29. <groupId>org.junit.platform</groupId>
  30. <artifactId>junit-platform-launcher</artifactId>
  31. <scope>test</scope>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.junit.jupiter</groupId>
  35. <artifactId>junit-jupiter-engine</artifactId>
  36. <scope>test</scope>
  37. </dependency>
  38. </dependencies>
  39. <build>
  40. <plugins>
  41. <plugin>
  42. <groupId>org.springframework.boot</groupId>
  43. <artifactId>spring-boot-maven-plugin</artifactId>
  44. <configuration>
  45. <mainClass>com.zsx.KafkaApplication</mainClass>
  46. </configuration>
  47. </plugin>
  48. </plugins>
  49. </build>
  50. </project>

2. application.yml文件配置

  1. spring:
  2. kafka:
  3. bootstrap-servers: 172.20.202.22:9092
  4. consumer:
  5. group-id: myGroup
  6. auto-offset-reset: earliest
  7. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9. producer:
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. value-serializer: org.apache.kafka.common.serialization.StringSerializer

3. 生产者

  1. package com.zsx.service;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class KafkaSender {
  9. private final static Logger LOGGER = LoggerFactory.getLogger(KafkaReceiver.class);
  10. @Autowired
  11. private KafkaTemplate<String, String> kafkaTemplate;
  12. //发送消息方法
  13. public void send() {
  14. String message = "Hello world";
  15. LOGGER.info("KafkaSender.send", message);
  16. kafkaTemplate.send("someTopic", message);
  17. }
  18. }

4. 消费者

  1. package com.zsx.service;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class KafkaReceiver {
  8. private final static Logger LOGGER = LoggerFactory.getLogger(KafkaReceiver.class);
  9. @KafkaListener(topics = "someTopic")
  10. public void processMessage(String content) {
  11. LOGGER.info("KafkaReceiver.processMessage: " + content);
  12. }
  13. }

5. 引导类

  1. package com.zsx;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaApplication.class, args);
  8. }
  9. }

6. 测试类

  1. package com.zsx.test;
  2. import com.zsx.service.KafkaSender;
  3. import org.junit.jupiter.api.Test;
  4. import org.junit.jupiter.api.extension.ExtendWith;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit.jupiter.SpringExtension;
  8. @SpringBootTest
  9. @ExtendWith(SpringExtension.class)
  10. public class KafkaSenderTest {
  11. @Autowired
  12. private KafkaSender kafkaSender;
  13. @Test
  14. void testSend() {
  15. kafkaSender.send();
  16. }
  17. }

7. 运行测试方法,查看测试结果

  1. C:\software\jdk-11.0.3\bin\java.exe -ea -Didea.test.cyclic.buffer.size=1048576 "-javaagent:C:\software\JetBrains\IntelliJ IDEA 2019.1.3\lib\idea_rt.jar=50257:C:\software\JetBrains\IntelliJ IDEA 2019.1.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\software\JetBrains\IntelliJ IDEA 2019.1.3\lib\idea_rt.jar;C:\software\JetBrains\IntelliJ IDEA 2019.1.3\plugins\junit\lib\junit-rt.jar;C:\software\JetBrains\IntelliJ IDEA 2019.1.3\plugins\junit\lib\junit5-rt.jar;D:\repository\org\junit\vintage\junit-vintage-engine\5.3.2\junit-vintage-engine-5.3.2.jar;D:\repository\org\apiguardian\apiguardian-api\1.0.0\apiguardian-api-1.0.0.jar;D:\repository\org\junit\platform\junit-platform-engine\1.3.2\junit-platform-engine-1.3.2.jar;D:\repository\org\junit\platform\junit-platform-commons\1.3.2\junit-platform-commons-1.3.2.jar;D:\repository\org\opentest4j\opentest4j\1.1.1\opentest4j-1.1.1.jar;D:\repository\junit\junit\4.12\junit-4.12.jar;D:\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;F:\IdeaProjects\springboot\springboot-kafka\target\test-classes;F:\IdeaProjects\springboot\springboot-kafka\target\classes;D:\repository\org\springframework\kafka\spring-kafka\2.2.8.RELEASE\spring-kafka-2.2.8.RELEASE.jar;D:\repository\org\springframework\spring-context\5.1.9.RELEASE\spring-context-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-aop\5.1.9.RELEASE\spring-aop-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-beans\5.1.9.RELEASE\spring-beans-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-expression\5.1.9.RELEASE\spring-expression-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-messaging\5.1.9.RELEASE\spring-messaging-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-tx\5.1.9.RELEASE\spring-tx-5.1.9.RELEASE.jar;D:\repository\org\springframework\retry\spring-retry\1.2.4.RELEASE\spring-retry-1.2.4.RELEASE.jar;D:\repository\org\apache\kafka\kafka-clients\2.0.1\kafka-clients-2.0.1.jar;D:\repository\org\lz4\lz4-java\1.4.1\lz4-java-1.4.1.jar;D:\repository\org\xerial\snappy\snappy-java\1.1.7.1\snappy-java-1.1.7.1.jar;D:\repository\org\slf4j\slf4j-api\1.7.26\slf4j-api-1.7.26.jar;D:\repository\org\springframework\kafka\spring-kafka-test\2.2.8.RELEASE\spring-kafka-test-2.2.8.RELEASE.jar;D:\repository\org\springframework\spring-test\5.1.9.RELEASE\spring-test-5.1.9.RELEASE.jar;D:\repository\org\apache\kafka\kafka-clients\2.0.1\kafka-clients-2.0.1-test.jar;D:\repository\com\fasterxml\jackson\core\jackson-databind\2.9.9\jackson-databind-2.9.9.jar;D:\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;D:\repository\com\fasterxml\jackson\core\jackson-core\2.9.9\jackson-core-2.9.9.jar;D:\repository\net\sf\jopt-simple\jopt-simple\5.0.4\jopt-simple-5.0.4.jar;D:\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;D:\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;D:\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;D:\repository\com\typesafe\scala-logging\scala-logging_2.11\3.9.0\scala-logging_2.11-3.9.0.jar;D:\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;D:\repository\org\apache\zookeeper\zookeeper\3.4.13\zookeeper-3.4.13.jar;D:\repository\org\apache\yetus\audience-annotations\0.5.0\audience-annotations-0.5.0.jar;D:\repository\org\apache\kafka\kafka_2.11\2.0.1\kafka_2.11-2.0.1-test.jar;D:\repository\org\junit\platform\junit-platform-launcher\1.3.2\junit-platform-launcher-1.3.2.jar;D:\repository\org\junit\jupiter\junit-jupiter-engine\5.3.2\junit-jupiter-engine-5.3.2.jar;D:\repository\org\junit\jupiter\junit-jupiter-api\5.3.2\junit-jupiter-api-5.3.2.jar;D:\repository\org\springframework\boot\spring-boot-starter-web\2.1.7.RELEASE\spring-boot-starter-web-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter\2.1.7.RELEASE\spring-boot-starter-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot\2.1.7.RELEASE\spring-boot-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-autoconfigure\2.1.7.RELEASE\spring-boot-autoconfigure-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-logging\2.1.7.RELEASE\spring-boot-starter-logging-2.1.7.RELEASE.jar;D:\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\repository\org\apache\logging\log4j\log4j-to-slf4j\2.11.2\log4j-to-slf4j-2.11.2.jar;D:\repository\org\apache\logging\log4j\log4j-api\2.11.2\log4j-api-2.11.2.jar;D:\repository\org\slf4j\jul-to-slf4j\1.7.26\jul-to-slf4j-1.7.26.jar;D:\repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;D:\repository\org\yaml\snakeyaml\1.23\snakeyaml-1.23.jar;D:\repository\org\springframework\boot\spring-boot-starter-json\2.1.7.RELEASE\spring-boot-starter-json-2.1.7.RELEASE.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.9\jackson-datatype-jdk8-2.9.9.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.9\jackson-datatype-jsr310-2.9.9.jar;D:\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.9\jackson-module-parameter-names-2.9.9.jar;D:\repository\org\springframework\boot\spring-boot-starter-tomcat\2.1.7.RELEASE\spring-boot-starter-tomcat-2.1.7.RELEASE.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.22\tomcat-embed-core-9.0.22.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.22\tomcat-embed-el-9.0.22.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.22\tomcat-embed-websocket-9.0.22.jar;D:\repository\org\hibernate\validator\hibernate-validator\6.0.17.Final\hibernate-validator-6.0.17.Final.jar;D:\repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;D:\repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;D:\repository\com\fasterxml\classmate\1.4.0\classmate-1.4.0.jar;D:\repository\org\springframework\spring-web\5.1.9.RELEASE\spring-web-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-webmvc\5.1.9.RELEASE\spring-webmvc-5.1.9.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-test\2.1.7.RELEASE\spring-boot-starter-test-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-test\2.1.7.RELEASE\spring-boot-test-2.1.7.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-test-autoconfigure\2.1.7.RELEASE\spring-boot-test-autoconfigure-2.1.7.RELEASE.jar;D:\repository\com\jayway\jsonpath\json-path\2.4.0\json-path-2.4.0.jar;D:\repository\net\minidev\json-smart\2.3\json-smart-2.3.jar;D:\repository\net\minidev\accessors-smart\1.2\accessors-smart-1.2.jar;D:\repository\org\ow2\asm\asm\5.0.4\asm-5.0.4.jar;D:\repository\org\assertj\assertj-core\3.11.1\assertj-core-3.11.1.jar;D:\repository\org\mockito\mockito-core\2.23.4\mockito-core-2.23.4.jar;D:\repository\net\bytebuddy\byte-buddy\1.9.16\byte-buddy-1.9.16.jar;D:\repository\net\bytebuddy\byte-buddy-agent\1.9.16\byte-buddy-agent-1.9.16.jar;D:\repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;D:\repository\org\hamcrest\hamcrest-library\1.3\hamcrest-library-1.3.jar;D:\repository\org\skyscreamer\jsonassert\1.5.0\jsonassert-1.5.0.jar;D:\repository\com\vaadin\external\google\android-json\0.0.20131108.vaadin1\android-json-0.0.20131108.vaadin1.jar;D:\repository\org\springframework\spring-core\5.1.9.RELEASE\spring-core-5.1.9.RELEASE.jar;D:\repository\org\springframework\spring-jcl\5.1.9.RELEASE\spring-jcl-5.1.9.RELEASE.jar;D:\repository\org\xmlunit\xmlunit-core\2.6.3\xmlunit-core-2.6.3.jar;D:\repository\org\glassfish\jaxb\jaxb-core\2.3.0.1\jaxb-core-2.3.0.1.jar;D:\repository\javax\xml\bind\jaxb-api\2.3.1\jaxb-api-2.3.1.jar;D:\repository\javax\activation\javax.activation-api\1.2.0\javax.activation-api-1.2.0.jar;D:\repository\org\glassfish\jaxb\txw2\2.3.1\txw2-2.3.1.jar;D:\repository\com\sun\istack\istack-commons-runtime\3.0.5\istack-commons-runtime-3.0.5.jar;D:\repository\com\sun\xml\bind\jaxb-impl\2.3.2\jaxb-impl-2.3.2.jar" com.intellij.rt.execution.junit.JUnitStarter -ideVersion5 -junit5 com.zsx.test.KafkaSenderTest,testSend
  2. 09:49:01.936 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating CacheAwareContextLoaderDelegate from class [org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate]
  3. 09:49:01.949 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating BootstrapContext using constructor [public org.springframework.test.context.support.DefaultBootstrapContext(java.lang.Class,org.springframework.test.context.CacheAwareContextLoaderDelegate)]
  4. 09:49:01.962 [main] DEBUG org.springframework.test.context.BootstrapUtils - Instantiating TestContextBootstrapper for test class [com.zsx.test.KafkaSenderTest] from class [org.springframework.boot.test.context.SpringBootTestContextBootstrapper]
  5. 09:49:01.970 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Neither @ContextConfiguration nor @ContextHierarchy found for test class [com.zsx.test.KafkaSenderTest], using SpringBootContextLoader
  6. 09:49:01.972 [main] DEBUG org.springframework.test.context.support.AbstractContextLoader - Did not detect default resource location for test class [com.zsx.test.KafkaSenderTest]: class path resource [com/zsx/test/KafkaSenderTest-context.xml] does not exist
  7. 09:49:01.973 [main] DEBUG org.springframework.test.context.support.AbstractContextLoader - Did not detect default resource location for test class [com.zsx.test.KafkaSenderTest]: class path resource [com/zsx/test/KafkaSenderTestContext.groovy] does not exist
  8. 09:49:01.973 [main] INFO org.springframework.test.context.support.AbstractContextLoader - Could not detect default resource locations for test class [com.zsx.test.KafkaSenderTest]: no resource found for suffixes {-context.xml, Context.groovy}.
  9. 09:49:01.974 [main] INFO org.springframework.test.context.support.AnnotationConfigContextLoaderUtils - Could not detect default configuration classes for test class [com.zsx.test.KafkaSenderTest]: KafkaSenderTest does not declare any static, non-private, non-final, nested classes annotated with @Configuration.
  10. 09:49:01.999 [main] DEBUG org.springframework.test.context.support.ActiveProfilesUtils - Could not find an 'annotation declaring class' for annotation type [org.springframework.test.context.ActiveProfiles] and class [com.zsx.test.KafkaSenderTest]
  11. 09:49:02.044 [main] DEBUG org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider - Identified candidate component class: file [F:\IdeaProjects\springboot\springboot-kafka\target\classes\com\zsx\KafkaApplication.class]
  12. 09:49:02.045 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Found @SpringBootConfiguration com.zsx.KafkaApplication for test class com.zsx.test.KafkaSenderTest
  13. 09:49:02.101 [main] DEBUG org.springframework.boot.test.context.SpringBootTestContextBootstrapper - @TestExecutionListeners is not present for class [com.zsx.test.KafkaSenderTest]: using defaults.
  14. 09:49:02.101 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Loaded default TestExecutionListener class names from location [META-INF/spring.factories]: [org.springframework.test.context.web.ServletTestExecutionListener, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener, org.springframework.test.context.support.DependencyInjectionTestExecutionListener, org.springframework.test.context.support.DirtiesContextTestExecutionListener, org.springframework.test.context.transaction.TransactionalTestExecutionListener, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener, org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener, org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener, org.springframework.boot.test.autoconfigure.restdocs.RestDocsTestExecutionListener, org.springframework.boot.test.autoconfigure.web.client.MockRestServiceServerResetTestExecutionListener, org.springframework.boot.test.autoconfigure.web.servlet.MockMvcPrintOnlyOnFailureTestExecutionListener, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverTestExecutionListener]
  15. 09:49:02.113 [main] INFO org.springframework.boot.test.context.SpringBootTestContextBootstrapper - Using TestExecutionListeners: [org.springframework.test.context.web.ServletTestExecutionListener@3e7dd664, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener@5b1ebf56, org.springframework.boot.test.mock.mockito.MockitoTestExecutionListener@294a6b8e, org.springframework.boot.test.autoconfigure.SpringBootDependencyInjectionTestExecutionListener@4b1d6571, org.springframework.test.context.support.DirtiesContextTestExecutionListener@1b835480, org.springframework.test.context.transaction.TransactionalTestExecutionListener@3549bca9, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener@4f25b795, org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener@6fb365ed, org.springframework.boot.test.autoconfigure.restdocs.RestDocsTestExecutionListener@6e950bcf, org.springframework.boot.test.autoconfigure.web.client.MockRestServiceServerResetTestExecutionListener@16414e40, org.springframework.boot.test.autoconfigure.web.servlet.MockMvcPrintOnlyOnFailureTestExecutionListener@74bada02, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverTestExecutionListener@525575]
  16. 09:49:02.115 [main] DEBUG org.springframework.test.context.support.AbstractDirtiesContextTestExecutionListener - Before test class: context [DefaultTestContext@517d4a0d testClass = KafkaSenderTest, testInstance = [null], testMethod = [null], testException = [null], mergedContextConfiguration = [WebMergedContextConfiguration@7862f56 testClass = KafkaSenderTest, locations = '{}', classes = '{class com.zsx.KafkaApplication}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true}', contextCustomizers = set[org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@268f106e, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@6c284af, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@609e8838, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@6c4906d3], resourceBasePath = 'src/main/webapp', contextLoader = 'org.springframework.boot.test.context.SpringBootContextLoader', parent = [null]], attributes = map['org.springframework.test.context.web.ServletTestExecutionListener.activateListener' -> true]], class annotated with @DirtiesContext [false] with mode [null].
  17. 09:49:02.134 [main] DEBUG org.springframework.test.context.support.TestPropertySourceUtils - Adding inlined properties to environment: {spring.jmx.enabled=false, org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true, server.port=-1}
  18. . ____ _ __ _ _
  19. /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
  20. ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
  21. \\/ ___)| |_)| | | | | || (_| | ) ) ) )
  22. ' |____| .__|_| |_|_| |_\__, | / / / /
  23. =========|_|==============|___/=/_/_/_/
  24. :: Spring Boot :: (v2.1.7.RELEASE)
  25. 2019-09-09 09:49:02.515 INFO 12840 --- [ main] com.zsx.test.KafkaSenderTest : Starting KafkaSenderTest on zsx with PID 12840 (started by zhang in F:\IdeaProjects\springboot\springboot-kafka)
  26. 2019-09-09 09:49:02.517 INFO 12840 --- [ main] com.zsx.test.KafkaSenderTest : No active profile set, falling back to default profiles: default
  27. 2019-09-09 09:49:03.236 INFO 12840 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$6993af06] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
  28. 2019-09-09 09:49:03.693 INFO 12840 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
  29. 2019-09-09 09:49:03.956 INFO 12840 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
  30. auto.commit.interval.ms = 5000
  31. auto.offset.reset = earliest
  32. bootstrap.servers = [172.20.202.22:9092]
  33. check.crcs = true
  34. client.id =
  35. connections.max.idle.ms = 540000
  36. default.api.timeout.ms = 60000
  37. enable.auto.commit = true
  38. exclude.internal.topics = true
  39. fetch.max.bytes = 52428800
  40. fetch.max.wait.ms = 500
  41. fetch.min.bytes = 1
  42. group.id = myGroup
  43. heartbeat.interval.ms = 3000
  44. interceptor.classes = []
  45. internal.leave.group.on.close = true
  46. isolation.level = read_uncommitted
  47. key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  48. max.partition.fetch.bytes = 1048576
  49. max.poll.interval.ms = 300000
  50. max.poll.records = 500
  51. metadata.max.age.ms = 300000
  52. metric.reporters = []
  53. metrics.num.samples = 2
  54. metrics.recording.level = INFO
  55. metrics.sample.window.ms = 30000
  56. partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  57. receive.buffer.bytes = 65536
  58. reconnect.backoff.max.ms = 1000
  59. reconnect.backoff.ms = 50
  60. request.timeout.ms = 30000
  61. retry.backoff.ms = 100
  62. sasl.client.callback.handler.class = null
  63. sasl.jaas.config = null
  64. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  65. sasl.kerberos.min.time.before.relogin = 60000
  66. sasl.kerberos.service.name = null
  67. sasl.kerberos.ticket.renew.jitter = 0.05
  68. sasl.kerberos.ticket.renew.window.factor = 0.8
  69. sasl.login.callback.handler.class = null
  70. sasl.login.class = null
  71. sasl.login.refresh.buffer.seconds = 300
  72. sasl.login.refresh.min.period.seconds = 60
  73. sasl.login.refresh.window.factor = 0.8
  74. sasl.login.refresh.window.jitter = 0.05
  75. sasl.mechanism = GSSAPI
  76. security.protocol = PLAINTEXT
  77. send.buffer.bytes = 131072
  78. session.timeout.ms = 10000
  79. ssl.cipher.suites = null
  80. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  81. ssl.endpoint.identification.algorithm = https
  82. ssl.key.password = null
  83. ssl.keymanager.algorithm = SunX509
  84. ssl.keystore.location = null
  85. ssl.keystore.password = null
  86. ssl.keystore.type = JKS
  87. ssl.protocol = TLS
  88. ssl.provider = null
  89. ssl.secure.random.implementation = null
  90. ssl.trustmanager.algorithm = PKIX
  91. ssl.truststore.location = null
  92. ssl.truststore.password = null
  93. ssl.truststore.type = JKS
  94. value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  95. 2019-09-09 09:49:04.066 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
  96. 2019-09-09 09:49:04.066 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
  97. 2019-09-09 09:49:04.139 INFO 12840 --- [ main] org.apache.kafka.clients.Metadata : Cluster ID: wSgEsr98SyiHVZtHj0dNmQ
  98. 2019-09-09 09:49:04.148 INFO 12840 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
  99. auto.commit.interval.ms = 5000
  100. auto.offset.reset = earliest
  101. bootstrap.servers = [172.20.202.22:9092]
  102. check.crcs = true
  103. client.id =
  104. connections.max.idle.ms = 540000
  105. default.api.timeout.ms = 60000
  106. enable.auto.commit = true
  107. exclude.internal.topics = true
  108. fetch.max.bytes = 52428800
  109. fetch.max.wait.ms = 500
  110. fetch.min.bytes = 1
  111. group.id = myGroup
  112. heartbeat.interval.ms = 3000
  113. interceptor.classes = []
  114. internal.leave.group.on.close = true
  115. isolation.level = read_uncommitted
  116. key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  117. max.partition.fetch.bytes = 1048576
  118. max.poll.interval.ms = 300000
  119. max.poll.records = 500
  120. metadata.max.age.ms = 300000
  121. metric.reporters = []
  122. metrics.num.samples = 2
  123. metrics.recording.level = INFO
  124. metrics.sample.window.ms = 30000
  125. partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  126. receive.buffer.bytes = 65536
  127. reconnect.backoff.max.ms = 1000
  128. reconnect.backoff.ms = 50
  129. request.timeout.ms = 30000
  130. retry.backoff.ms = 100
  131. sasl.client.callback.handler.class = null
  132. sasl.jaas.config = null
  133. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  134. sasl.kerberos.min.time.before.relogin = 60000
  135. sasl.kerberos.service.name = null
  136. sasl.kerberos.ticket.renew.jitter = 0.05
  137. sasl.kerberos.ticket.renew.window.factor = 0.8
  138. sasl.login.callback.handler.class = null
  139. sasl.login.class = null
  140. sasl.login.refresh.buffer.seconds = 300
  141. sasl.login.refresh.min.period.seconds = 60
  142. sasl.login.refresh.window.factor = 0.8
  143. sasl.login.refresh.window.jitter = 0.05
  144. sasl.mechanism = GSSAPI
  145. security.protocol = PLAINTEXT
  146. send.buffer.bytes = 131072
  147. session.timeout.ms = 10000
  148. ssl.cipher.suites = null
  149. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  150. ssl.endpoint.identification.algorithm = https
  151. ssl.key.password = null
  152. ssl.keymanager.algorithm = SunX509
  153. ssl.keystore.location = null
  154. ssl.keystore.password = null
  155. ssl.keystore.type = JKS
  156. ssl.protocol = TLS
  157. ssl.provider = null
  158. ssl.secure.random.implementation = null
  159. ssl.trustmanager.algorithm = PKIX
  160. ssl.truststore.location = null
  161. ssl.truststore.password = null
  162. ssl.truststore.type = JKS
  163. value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  164. 2019-09-09 09:49:04.152 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
  165. 2019-09-09 09:49:04.153 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
  166. 2019-09-09 09:49:04.154 INFO 12840 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
  167. 2019-09-09 09:49:04.166 INFO 12840 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : Cluster ID: wSgEsr98SyiHVZtHj0dNmQ
  168. 2019-09-09 09:49:04.169 INFO 12840 --- [ main] com.zsx.test.KafkaSenderTest : Started KafkaSenderTest in 2.026 seconds (JVM running for 2.72)
  169. 2019-09-09 09:49:04.183 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Discovered group coordinator 172.20.202.22:9092 (id: 2147482646 rack: null)
  170. 2019-09-09 09:49:04.185 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Revoking previously assigned partitions []
  171. 2019-09-09 09:49:04.186 INFO 12840 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: []
  172. 2019-09-09 09:49:04.186 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] (Re-)joining group
  173. 2019-09-09 09:49:04.198 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Successfully joined group with generation 1
  174. 2019-09-09 09:49:04.200 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=myGroup] Setting newly assigned partitions [someTopic-0]
  175. 2019-09-09 09:49:04.219 INFO 12840 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=myGroup] Resetting offset for partition someTopic-0 to offset 0.
  176. 2019-09-09 09:49:04.220 INFO 12840 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [someTopic-0]
  177. 2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
  178. 2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
  179. 2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
  180. 2019-09-09 09:49:04.247 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
  181. 2019-09-09 09:49:04.388 INFO 12840 --- [ main] com.zsx.service.KafkaReceiver : KafkaSender.send
  182. 2019-09-09 09:49:04.390 INFO 12840 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
  183. acks = 1
  184. batch.size = 16384
  185. bootstrap.servers = [172.20.202.22:9092]
  186. buffer.memory = 33554432
  187. client.id =
  188. compression.type = none
  189. connections.max.idle.ms = 540000
  190. enable.idempotence = false
  191. interceptor.classes = []
  192. key.serializer = class org.apache.kafka.common.serialization.StringSerializer
  193. linger.ms = 0
  194. max.block.ms = 60000
  195. max.in.flight.requests.per.connection = 5
  196. max.request.size = 1048576
  197. metadata.max.age.ms = 300000
  198. metric.reporters = []
  199. metrics.num.samples = 2
  200. metrics.recording.level = INFO
  201. metrics.sample.window.ms = 30000
  202. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
  203. receive.buffer.bytes = 32768
  204. reconnect.backoff.max.ms = 1000
  205. reconnect.backoff.ms = 50
  206. request.timeout.ms = 30000
  207. retries = 0
  208. retry.backoff.ms = 100
  209. sasl.client.callback.handler.class = null
  210. sasl.jaas.config = null
  211. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  212. sasl.kerberos.min.time.before.relogin = 60000
  213. sasl.kerberos.service.name = null
  214. sasl.kerberos.ticket.renew.jitter = 0.05
  215. sasl.kerberos.ticket.renew.window.factor = 0.8
  216. sasl.login.callback.handler.class = null
  217. sasl.login.class = null
  218. sasl.login.refresh.buffer.seconds = 300
  219. sasl.login.refresh.min.period.seconds = 60
  220. sasl.login.refresh.window.factor = 0.8
  221. sasl.login.refresh.window.jitter = 0.05
  222. sasl.mechanism = GSSAPI
  223. security.protocol = PLAINTEXT
  224. send.buffer.bytes = 131072
  225. ssl.cipher.suites = null
  226. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  227. ssl.endpoint.identification.algorithm = https
  228. ssl.key.password = null
  229. ssl.keymanager.algorithm = SunX509
  230. ssl.keystore.location = null
  231. ssl.keystore.password = null
  232. ssl.keystore.type = JKS
  233. ssl.protocol = TLS
  234. ssl.provider = null
  235. ssl.secure.random.implementation = null
  236. ssl.trustmanager.algorithm = PKIX
  237. ssl.truststore.location = null
  238. ssl.truststore.password = null
  239. ssl.truststore.type = JKS
  240. transaction.timeout.ms = 60000
  241. transactional.id = null
  242. value.serializer = class org.apache.kafka.common.serialization.StringSerializer
  243. 2019-09-09 09:49:04.401 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
  244. 2019-09-09 09:49:04.401 INFO 12840 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
  245. 2019-09-09 09:49:04.405 INFO 12840 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: wSgEsr98SyiHVZtHj0dNmQ
  246. 2019-09-09 09:49:04.414 INFO 12840 --- [ntainer#0-0-C-1] com.zsx.service.KafkaReceiver : KafkaReceiver.processMessage: Hello world
  247. 2019-09-09 09:49:04.419 INFO 12840 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService
  248. 2019-09-09 09:49:04.424 INFO 12840 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped
  249. 2019-09-09 09:49:04.425 INFO 12840 --- [ Thread-1] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
  250. 2019-09-09 09:49:04.425 INFO 12840 --- [ Thread-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
  251. Process finished with exit code 0

20190909095038495.png

20190909095102463.png

从结果可以看出配置成功

发表评论

表情:
评论列表 (有 0 条评论,75人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家