programing

기본 바인더가 요청되었지만 'org.springframework.cloud.stream.messaging에 사용할 수 있는 바인더가 없습니다.특성 채널로 직접 연결'

bestprogram 2023. 6. 21. 22:52

기본 바인더가 요청되었지만 'org.springframework.cloud.stream.messaging에 사용할 수 있는 바인더가 없습니다.특성 채널로 직접 연결'

스프링 클라우드 + 카프카 스트림 + 스프링 부트 2로 가능한 한 가장 간단한 헬로 월드를 만들려고 합니다.

기본적인 개념이 그립다는 것을 깨달았습니다.기본적으로, 저는 다음을 이해합니다.

1 - 카프카 항목에 메시지를 쓸 아웃바운드 스트림과 카프카 항목에서 메시지를 읽을 인바운드 스트림을 정의해야 합니다.

public interface LoansStreams {

    String INPUT = "loans-in";
    String OUTPUT = "loans-out";

    @Input(INPUT)
    SubscribableChannel inboundLoans();

    @Output(OUTPUT)
    MessageChannel outboundLoans();

}

2 - 내 스트림에 바인딩되도록 Spring Cloud Stream 구성

@EnableBinding(LoansStreams.class)
public class StreamsConfig {
}

3 - Kafka 속성 구성

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        loans-in:
          destination: loans
          contentType: application/json
        loans-out:
          destination: loans
          contentType: application/json

4 - 메시지 교환 모델 만들기

@Getter @Setter @ToString @Builder
public class Loans {
    private long timestamp;
    private String result;
}

5 - 카프카에게 편지 쓰기

@Service
@Slf4j
public class LoansService {
    private final LoansStreams loansStreams;
    public LoansService(LoansStreams loansStreams) {
        this.loansStreams = loansStreams;
    }
    public void sendLoan(final Loans loans) {
        log.info("Sending loans {}", loans);
        MessageChannel messageChannel = loansStreams.outboundLoans();
        messageChannel.send(MessageBuilder
                .withPayload(loans)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

6 - 카프카 주제 듣기

@Component
@Slf4j
public class LoansListener {

    @StreamListener(LoansStreams.INPUT)
    public void handleLoans(@Payload Loans loans) {
        log.info("Received results: {}", loans);

    }
}

저는 블로그 몇 개를 읽으며 하루 종일 보냈고 위의 코드는 적어도 작동 가능하다고 생각합니다.저는 제가 가능한 한 최선의 접근법을 코딩하고 있는지 잘 모르겠습니다.그런데 다음 항목에 언급된 오류가 있습니다.

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 18:33:05.619 ERROR 14784 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.

해결 방법을 검색해보니 모델을 반환하는 StreamListe를 코드화하라는 누군가의 말을 발견하여 다음으로 대체했습니다.

@StreamListener(LoansStreams.INPUT)
@SendTo("loans-out")
public KStream<?, Loans> process(KStream<?, Loans> l) {
    log.info("Received: {}", l);
    return l;
}

그런 다음 오류가 발생합니다(이전 오류에서 일부 바인더 문제가 명확하게 언급됨).

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 19:01:06.016 ERROR 13276 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalArgumentException: Method must be declarative
        at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:510) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:168) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
        at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:226) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]

어떻게든 도움이 될 경우, 저는 SAGAS를 적용하기 위해 이 아이디어를 진화시키고 싶지만 이 질문의 초점이 아닙니다.우선 기본적인 것부터 시작해야 합니다.

*재밌는

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.mybank</groupId>
    <artifactId>kafka-cloud-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-cloud-stream</name>
    <description>Spring Cloud Stream With Kafka</description>

    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <!-- version>5.1.5.RELEASE</version-->
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

"기본 바인더가 요청되었지만 사용 가능한 바인더가 없습니다...". 아래와 같이 종속성을 추가하십시오.

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

application.yml(또는 application.properties)에서 기본 바인더를 정의할 수 있습니다.

spring:
  cloud:
    stream:
      bindings:
        ...
      default-binder: kafka

나는, 다른 것으로.application.properties다양한 컨텍스트와 여러 출력 바인딩에 대해 다음과 같은 일반적인 기본 바인딩을 정의하는 것이 수정할 수 있는 유일한 방법이었습니다.

spring:
  cloud:
    stream:
      default-binder: eventhub
      ...

그리고 나머지 바인딩 유형도 각 입력/출력에 개별적으로 설정됩니다.

위의 pom 파일에서, 당신은 사용해야 합니다.binder-kafka그리고 아닌binder-kafka-streams

따라서 대체

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

와 함께

         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

언급URL : https://stackoverflow.com/questions/55875428/a-default-binder-has-been-requested-but-there-are-no-binders-available-for-org