`
sillycat
  • 浏览: 2539704 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Spring Boot and RESTful API(14)Spring Boot with AKKA

 
阅读更多
Spring Boot and RESTful API(14)Spring Boot with AKKA

Logging Performance
https://www.slf4j.org/faq.html#logging_performance
logger.debug("The new entry is {}. It replaces {}.", entry, oldEntry);

Add AKKA to my Spring Boot Project
pom.xml

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.4.19</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-slf4j_2.12</artifactId>
    <version>2.4.19</version>
</dependency>

In the main class, system will initiate the AKKA system.
package com.sillycat.jobsmonitorapi;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.sillycat.jobsmonitorapi.akka.base.SpringExtension;
import com.sillycat.jobsmonitorapi.service.JobDynamicCronService;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

@SpringBootApplication
@EnableScheduling
public class JobsMonitorAPIApplication extends SpringBootServletInitializer {

    private static final Logger logger = LoggerFactory.getLogger(JobsMonitorAPIApplication.class);

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(JobsMonitorAPIApplication.class);

        logger.info("Start to init the AKKA system...");
        SpringExtension ext = context.getBean(SpringExtension.class);
        ActorSystem system = context.getBean(ActorSystem.class);
        ActorRef supervisor = system.actorOf(ext.props("outOfBudgetSupervisor").withMailbox("akka.priority-mailbox"),
                "outOfBudgetSupervisor");
        logger.info("supervisor init with path {}", supervisor.path());
        logger.info("AKKA system success inited...");
        logger.info("Start the cron tasks to monitor ");
        context.getBean(JobDynamicCronService.class).startCron();
        logger.info("Cron tasks started! ");
    }

}

In the configuration Directory, system will auto set up the ENV
package com.sillycat.jobsmonitorapi.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import com.sillycat.jobsmonitorapi.akka.base.SpringExtension;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;

@Configuration
@Lazy
@EnableAutoConfiguration
@ComponentScan(basePackages = { "com.sillycat.jobsmonitorapi.akka" })
public class ApplicationConfiguration {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private SpringExtension springExtension;

    @Bean
    public ActorSystem actorSystem() {
        ActorSystem system = ActorSystem.create("JobsMonitorAPIAKKA", akkaConfiguration());
        springExtension.initialize(applicationContext);
        return system;
    }

    @Bean
    public Config akkaConfiguration() {
        return ConfigFactory.load();
    }

}

My system will be named as JobsMonitorAPIAKKA, some base class will help Actor use Spring and Spring Bean find Actor
package com.sillycat.jobsmonitorapi.akka.base;

import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import com.sillycat.jobsmonitorapi.akka.base.SpringActorProducer;
import akka.actor.Extension;
import akka.actor.Props;

@Component
public class SpringExtension implements Extension {

    private ApplicationContext applicationContext;

    public void initialize(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public Props props(String actorBeanName) {
        return Props.create(SpringActorProducer.class, applicationContext, actorBeanName);
    }

}


package com.sillycat.jobsmonitorapi.akka.base;

import org.springframework.context.ApplicationContext;
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;

public class SpringActorProducer implements IndirectActorProducer {

    private final ApplicationContext applicationContext;
    private final String actorBeanName;

    public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
    }

    @Override
    public Actor produce() {
        return (Actor) applicationContext.getBean(actorBeanName);
    }

    @SuppressWarnings("unchecked")
    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }

}

Customized PriorityMaiBox Policy help system to priority the messages
package com.sillycat.jobsmonitorapi.akka.base;

import com.sillycat.jobsmonitorapi.akka.msg.OutOfBudgetJobMsg;
import com.typesafe.config.Config;

import akka.actor.ActorSystem;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;

public class PriorityMailbox extends UnboundedPriorityMailbox {

    public PriorityMailbox(ActorSystem.Settings settings, Config config) {

        // Create a new PriorityGenerator, lower priority means more important
        super(new PriorityGenerator() {

            @Override
            public int gen(Object message) {
                if (message instanceof OutOfBudgetJobMsg) {
                    return ((OutOfBudgetJobMsg) message).getPriority();
                } else {
                    // default
                    return 100;
                }
            }
        });

    }
}

Message POJO will be as normal, just add one priority column
package com.j2c.jobsmonitorapi.akka.msg;

public class OutOfBudgetJobMsg {

private String id;

private Integer sourceID;

private Integer campaignID;

private String jobReference;

private Boolean paused;

private Boolean dailyCapped;

private Boolean deduped;

private Boolean expired;

private Integer priority;

public OutOfBudgetJobMsg(String id) {
this.id = id;
this.priority = 10;
}
public OutOfBudgetJobMsg(String id, Integer sourceID, Integer campaignID, String jobReference, Boolean paused,
Boolean dailyCapped, Boolean deduped, Boolean expired) {
this.id = id;
this.sourceID = sourceID;
this.campaignID = campaignID;
this.jobReference = jobReference;
this.paused = paused;
this.dailyCapped = dailyCapped;
this.deduped = deduped;
this.expired = expired;
this.priority = 10;
}
…snip...

}

Here is my Actor
package com.sillycat.jobsmonitorapi.akka.actor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import com.sillycat.jobsmonitorapi.akka.msg.OutOfBudgetJobMsg;
import com.sillycat.jobsmonitorapi.domain.JobCloud;
import com.sillycat.jobsmonitorapi.repository.JobCloudPartialRepositorySolr;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

@Component
@Scope("prototype")
public class OutOfBudgetActor extends UntypedActor {

    private final LoggingAdapter logger = Logging.getLogger(getContext().system(), "TaskProcessor");

    @Autowired
    JobCloudPartialRepositorySolr jobCloudPartialRepositorySolr;

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof OutOfBudgetJobMsg) {
            logger.debug("Message we receive is {}", message);
            OutOfBudgetJobMsg msg = (OutOfBudgetJobMsg) message;
            jobCloudPartialRepositorySolr.update(new JobCloud(msg.getId(), msg.getSourceID(), msg.getCampaignID(),
                    msg.getJobReference(), msg.getPaused(), msg.getDailyCapped(), msg.getDeduped(), msg.getExpired()),
                    false);
            // String id, Integer sourceID, Integer campaignID, String
            // jobReference, Boolean paused,
            // Boolean dailyCapped, Boolean deduped, Boolean expired
            logger.debug("Update the status back to SOLR ");
        } else {
            logger.error("outOfBudgetActor receive msg = {} msg type = {}", message, message.getClass());
            unhandled(message);
        }

    }

}

Here is my SupervisorActor
package com.sillycat.jobsmonitorapi.akka.actor;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import com.sillycat.jobsmonitorapi.akka.base.SpringExtension;
import com.sillycat.jobsmonitorapi.akka.msg.OutOfBudgetJobMsg;

import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.ActorRefRoutee;
import akka.routing.Routee;
import akka.routing.Router;
import akka.routing.SmallestMailboxRoutingLogic;

@Component
@Scope("prototype")
public class OutOfBudgetSupervisor extends UntypedActor {

    private final LoggingAdapter logger = Logging.getLogger(getContext().system(), "OutOfBudgetSupervisor");

    @Autowired
    private SpringExtension springExtension;

    private Router router;
   
    @Value("${outofBudgetActorCount}")
    private Integer outofBudgetActorCount;

    public void preStart() throws Exception {
        logger.info("Init the operation on OutOfBudget...");
        List<Routee> routees = new ArrayList<Routee>();
        for (int i = 0; i < this.outofBudgetActorCount; i++) {
            ActorRef actor = getContext().actorOf(springExtension.props("outOfBudgetActor"));
            getContext().watch(actor);
            routees.add(new ActorRefRoutee(actor));
        }
        router = new Router(new SmallestMailboxRoutingLogic(), routees);
        super.preStart();
        logger.info("Init the operation on OutOfBudget success!");
    }

    public void onReceive(Object msg) throws Exception {
        if (msg instanceof OutOfBudgetJobMsg) {
            router.route(msg, getSender());
        } else if (msg instanceof Terminated) {
            router = router.removeRoutee(((Terminated) msg).actor());
            ActorRef actor = getContext().actorOf(springExtension.props("outOfBudgetActor"));
            getContext().watch(actor);
            router = router.addRoutee(new ActorRefRoutee(actor));
        } else {
            logger.error("Unable to handle msg {}", msg);
        }
    }

    public void postStop() throws Exception {
        logger.info("Shutting down OutOfBudgetSupervisor");
        super.postStop();
    }
}

The configuration file application.conf will be as follow:
akka {

  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # Log level used by the configured loggers (see "loggers") as soon
  # as they have been started; before that, see "stdout-loglevel"
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  loglevel = "DEBUG"

  # Log level for the very basic logger activated during ActorSystem startup.
  # This logger prints the log messages to stdout (System.out).
  # Options: OFF, ERROR, WARNING, INFO, DEBUG
  stdout-loglevel = "DEBUG"

  priority-mailbox {
    mailbox-type = "com.sillycat.jobsmonitorapi.akka.base.PriorityMailbox"
  }

}

References:
http://kimrudolph.de/blog/spring-boot-meets-akka
https://github.com/krudolph/akkaflow
https://www.slf4j.org/faq.html#logging_performance
分享到:
评论

相关推荐

    机械手控制系统:使用Java,Spring Boot和Docker的Akka群集。 会议演讲的演示应用

    在本项目中,Spring Boot可能被用来处理HTTP请求,管理数据库连接,提供RESTful API,以及实现服务的自动化配置。 Docker是一种流行的容器化技术,它允许开发者将应用及其依赖打包成轻量级的容器,方便在不同的环境...

    springboot-reactive-tdd:在Spring Boot上使用Cucumber,MVC,Reactive Restful,Reactive WebSocket,Akka Actor等进行TDD

    使用Spring Boot进行全栈应用程序的测试驱动开发 参见Github页面( ) 该项目解释并演示了在Spring Boot中使用TDD方法创建完整堆栈应用程序的必要步骤。 该项目的当前状态如下: 技术展示 堆叠位置 技术重点 迭代...

    ivc-server:IVC报告工具的后端服务器

    Spring Boot提供了自动配置、内嵌的Tomcat或Jetty服务器、以及对各种常用库的开箱即用支持,如数据库连接、RESTful API设计等。在"ivc-server"中,我们可能能看到Spring Boot的运用,它使得开发者可以快速搭建功能...

    编排基于微服务的流程_Scala_Java_下载.zip

    微服务架构是一种将单体应用拆分为一组小型、独立的服务的方法,每个服务都可以在其自己的进程中运行,与其它服务通过轻量级通信机制(如HTTP/RESTful API)进行交互。这种架构风格有助于提高系统的可伸缩性、可维护...

    15种类微服务架构框架分析.docx

    微服务架构是一种软件开发方法,它提倡将单一应用程序分解为一组小的服务,每个服务都运行在其自己的进程中,服务之间通过轻量级的方式通信,通常采用HTTP RESTful API。这种架构模式近年来得到了广泛应用,因为它...

    常见的后端开发框架及其特点

    - **特点**:Spring Boot 是基于 Spring 框架的快速开发工具包,极大简化了 Java EE 应用的开发过程。 - **适用场景**:特别适合于微服务架构下的应用开发,可以快速搭建并部署服务。 2. **Java EE(Jakarta EE)...

    JAVA 框

    8. **并发和异步处理框架**:Akka和Reactor是响应式编程的代表,它们利用Java 8的Stream API和CompletableFuture来处理高并发和非阻塞IO。 9. **RESTful API框架**:如 Jersey 和 RESTEasy,它们帮助开发者创建符合...

    Web服务:Consumiendo una API

    Spring Boot简化了配置,并提供了丰富的库,如Spring Web和Spring Web MVC,用于构建RESTful服务。对于消费API,Spring的RestTemplate类提供了方便的API调用功能,而Retrofit是另一个流行的选择,它提供了一种类型...

    java:java框架与知识

    Spring MVC是目前最流行的Java Web框架,它结合了Spring IoC容器,提供了全面的MVC功能,支持RESTful API开发。 2. ORM框架:Hibernate、MyBatis等,它们用于处理数据库操作,使开发者能以面向对象的方式编写SQL,...

    高并发、分布式、高可用、微服务、海量数据处理知识

    Spring Boot和Spring Cloud是Java生态系统中实现微服务的主流框架,它们提供了服务发现、配置中心、熔断、限流等功能。 海量数据处理涉及大数据技术,如Hadoop、Spark和Flink。Hadoop的HDFS提供了分布式存储,...

    Java网络编程与分布式计算

    3. Spring Cloud:一套基于Spring Boot的微服务框架,包含服务发现、配置中心、熔断机制等组件,支持构建分布式系统。 4. Akka:使用Actor模型的并发框架,可在Java和Scala中使用,有助于构建高并发、容错的分布式...

    SDIS14_NRT:Projet SDIS14 de NosRégionson t人才

    9. **API设计**:项目可能涉及与其他系统的交互,因此RESTful API设计原则和使用JSON格式的数据交换可能会被应用。 10. **性能优化**:为了确保高效运行,可能需要进行性能监控和调优,例如使用JProfiler或VisualVM...

    UrbanRuralEwallet

    同时,Spring Boot简化了项目的初始化和配置过程,使得开发团队可以更快地投入到功能实现中。 数据库管理方面,项目可能使用了Java Persistence API (JPA) 和Hibernate,这两个工具使得与数据库的交互变得更加简单...

    30种java技术框架-方案架构图汇总.zip

    2. **Spring Boot**:基于Spring的简化版启动框架,它通过默认配置和starter pom简化了项目的初始化和配置,使得快速开发成为可能。 3. **Hibernate**:是一个强大的ORM(对象关系映射)框架,简化了Java应用与...

    java抢票系统源码-unicenta:独角兽

    5. **RESTful API**:考虑到抢票系统的分布式特性,系统可能会使用REST(Representational State Transfer)风格的API来实现前后端分离,允许前端通过HTTP请求获取和更新票务数据。 6. **异步编程**:为了提高性能...

    evo:演进-智能物联网云

    Java提供的丰富的库和框架,如Spring Boot、MyBatis等,可能被用于构建后端服务,处理设备连接、数据流和业务逻辑。此外,考虑到物联网平台通常需要处理大量的实时数据,可能会使用到Java的并发处理和大数据处理技术...

Global site tag (gtag.js) - Google Analytics