使用 Spring Modulith 進行事件外部化

2024年2月6日 23点热度 0人点赞

在本文中,我們將討論在@Transactional塊中發佈消息需求以及相關的性能挑戰,例如延長的數據庫連接時間。為了解決這個問題,我們將利用Spring Modulith的功能來監聽 Spring 應用程序事件並自動將它們發佈到Kafka主題。

事件外部化即從模塊內部向外發佈事件到其他地方。

事務操作和消息代理
對於本文的代碼示例,我們假設我們正在編寫負責在 Baeldung 上保存文章的功能:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;
    // constructor
    @Transactional
    public void createArticle(Article article) {
        validateArticle(article);
        article = addArticleTags(article);
        // ... other business logic
        articleRepository.save(article);
    }
}


此外,我們需要將這條新Article通知系統的其他部分。有了這些信息,其他模塊或服務將做出相應反應,創建報告或向網站讀者發送新聞通訊。

實現此目的的最簡單方法是註入知道如何發佈此事件的依賴項。對於我們的示例,讓我們使用KafkaOperations向“
baeldung.articles.published ”主題發送消息,並使用Article的slug()作為鍵:

@Service
class Baeldung {
    private final ArticleRepository articleRepository;
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
    // constructor
    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);
        messageProducer.send(
          "baeldung.articles.published",
          article.slug(),
          new ArticlePublishedEvent(article.slug(), article.title())
        ).join();
    }
}


然而,由於一些不同的原因,這種方法並不理想。從設計的角度來看,我們將領域服務與消息生產者耦合起來。此外,領域服務直接依賴於較低級別的組件,這違反了基本的清潔
架構規則之一。

此外,這種方法還會產生性能影響,因為一切都發生在@Transacional方法中。因此,為保存文章而獲取的數據庫連接將保持打開狀態,直到消息成功發佈。

最後,保存實體和發佈消息將作為原子操作完成。換句話說,如果生產者未能發佈事件,數據庫事務將被回滾。

使用 Spring 事件進行依賴反轉
我們可以利用Spring Events來改進我們解決方案的設計。我們的目標是避免直接從我們的域服務將消息發佈到Kafka。讓我們刪除KafkaOperations依賴項
並發佈內部應用程序事件:

@Service
public class Baeldung {
    private final ApplicationEventPublisher applicationEvents;
    private final ArticleRepository articleRepository;
    // constructor
    @Transactional
    public void createArticle(Article article) {
        // ... business logic
        validateArticle(article);
        article = addArticleTags(article);
        article = articleRepository.save(article);
        applicationEvents.publishEvent(
          new ArticlePublishedEvent(article.slug(), article.title()));
    }
}


除此之外,我們還將有一個專門的 Kafka 生產者作為我們基礎設施層的一部分。該組件將偵聽 ArticlePublishedEvent並將發佈委托給底層KafkaOperations bean:

@Component
class ArticlePublishedKafkaProducer {
    private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
    // constructor 
    @EventListener
    public void publish(ArticlePublishedEvent article) {
        Assert.notNull(article.slug(), "Article Slug must not be null!");
        messageProducer.send("baeldung.articles.published", article.splug(), event);
    }
}


通過這種抽象,基礎設施組件現在依賴於領域服務生成的事件。換句話說,我們已經成功地減少了耦合並反轉了源代碼依賴。此外,如果其他模塊對文章創建感興趣,它們現在可以無縫監聽這些應用程序事件並做出相應反應。

原子與原子 非原子操作
現在,讓我們深入研究性能考慮因素。首先,我們必須確定當與消息代理的通信失敗時回滾是否是所需的行為。這種選擇根據具體情況而有所不同。

如果我們不需要這種原子性,則必須釋放數據庫連接並異步發佈事件。為了模擬這一點,我們可以嘗試創建一篇沒有slug 的文章, 導致
ArticlePublishedKafkaProducer::publish失敗:

@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
    var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");
    baeldung.createArticle(article);
    assertThat(repository.findAll())
      .hasSize(1).first()
      .extracting(Article::title, Article::author)
      .containsExactly("Introduction to Spring Boot", "John Doe");
}


如果我們現在運行測試,它將失敗。發生這種情況是因為
ArticlePublishedKafkaProducer拋出異常,導致域服務回滾事務。但是,我們可以通過將@EventListener註解替換為@
TransactionalEventListener和@Async來使事件監聽器異步:

@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
    Assert.notNull(event.slug(), "Article Slug must not be null!");
    messageProducer.send("baeldung.articles.published", event);
}


如果我們現在重新運行測試,我們會註意到異常已記錄,事件未發佈,並且實體已保存到數據庫中。而且,數據庫連接釋放得更快,允許其他線程使用它。

使用 Spring Modulith 進行事件外部化
我們通過兩步方法成功解決了原始代碼示例的設計和性能問題:

使用 Spring 應用程序事件進行依賴反轉
利用@
TransactionalEventListener和@Async進行異步發佈

Spring Modulith 允許我們進一步簡化代碼,為該
模式提供內置支持。 讓我們首先將
spring-modulith-events-api的 Maven 依賴項添加到pom.xml中:

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-events-api</artifactId>
    <version>1.1.2</version>
</dependency>


該模塊可以配置為偵聽應用程序事件並自動將它們外部化到各種消息系統。我們將堅持原來的示例並重點關註 Kafka。對於此集成,我們需要添加
spring-modulith-events-kafka依賴項:

<dependency> 
    <groupId>org.springframework.modulith</groupId> 
    <artifactId>spring-modulith-events-kafka</artifactId> 
    <version>1.1.2</version> 
</dependency>


現在,我們需要更新ArticlePublishedEvent 並使用@Externalized對其進行註釋。此註釋需要路由目標的名稱和密鑰。換句話說,Kafka 主題和消息鍵。對於鍵,我們將使用SpEL表達式來調用Article :: slug():

@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}

事件外部化配置
盡管@Externalized註釋的值對於簡潔的 SpEL 表達式很有用,但在某些情況下我們可能希望避免使用它:

如果表達式變得過於復雜
當我們的目標是將有關主題的信息與應用程序事件分開時
如果我們想要應用程序事件和外部化事件的不同模型
對於這些用例,我們可以使用
EventExternalizationConfiguration 的構建器配置必要的路由和事件映射。之後,我們隻需將此配置公開為Spring bean:

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
    return EventExternalizationConfiguration.externalizing()
      .select(EventExternalizationConfiguration.annotatedAsExternalized())
      .route(
        ArticlePublishedEvent.class,
        it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
      )
      .mapping(
        ArticlePublishedEvent.class,
        it -> new ArticlePublishedKafkaEvent(it.slug(), it.title())
      )
      .build();
}


在這種情況下,我們將從ArticlePublishedEvent中刪除路由信息,並保留@Externalized註釋,使其不具有任何值:

@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}

七、結論
在本文中,我們討論了需要我們從事務塊內發佈消息的場景。我們發現這種模式可能會對性能產生很大的影響,因為它可能會長時間阻塞數據庫連接。

之後,我們使用 Spring Modulith 的功能來監聽 Spring 應用程序事件並自動將其發佈到 Kafka 主題。這種方法使我們能夠異步外部化事件並更快地釋放數據庫連接。

使用 Spring Modulith 進行事件外部化 - 極道