在实验室航天微服务项目的开发中,需要引入时序数据库进行日志数据的存取,经过事先的时序数据库调研 最终选择采用InfluxDB。本文记录一下使用Spring Boot+InfluxDB实现日志管理的具体细节。
一、需求背景 在考虑引入InfluxDB之前,整个项目都使用MongoDB进行数据存储,但是考虑到日志数据相对于业务数据的特点,最终选择了更适合的InfluxDB用来专门存放日志数据。项目是基于微服务和Spring Boot框架构建的,由于在Spring Boot的框架中,InfluxDB不存在类似MongoDB的spring-boot-starter-data-mongodb库,因此不能很方便地对Spring Boot自动配置数据库,也没有像MongoDB那样很方便的接口(比如MongoRepository等)可用。既然指望不上Spring Boot框架,我采用的是InfluxDB的Java客户端库influxdb-java 。
二、源代码 1. InfluxDBUtil.java @Data @Slf 4jpublic class InfluxDBUtil { private String url; private String database; private InfluxDB influxDB; public InfluxDBUtil (String url, String database) { this .url = url; this .database = database; this .influxDB = buildInfluxDB(); } private InfluxDB buildInfluxDB () { if (influxDB == null ) { influxDB = InfluxDBFactory.connect(url); } try { if (!influxDB.databaseExists(database)) { influxDB.createDatabase(database); } influxDB.setDatabase(database); } catch (Exception e) { log.error("create influx db failed, error: {}" , e.getMessage()); } influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); return influxDB; } public QueryResult query (String command) { return influxDB.query(new Query(command, database)); } public void insert (String measurement, Map<String, String> tags, Map<String, Object> fields) { Point point = Point.measurement(measurement) .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .tag(tags) .fields(fields) .build(); influxDB.write(point); } }
这是一个工具类,提供InfluxDB实例的创建以及插入和查询接口。
2. InfluxDBConfig.java @Component public class InfluxDBConfig { @Value ("${spring.influx.url}" ) private String url; @Value ("${spring.influx.database}" ) private String database; @Bean public InfluxDBUtil influxDBUtil () { return new InfluxDBUtil(url, database); } }
InfluxDBUtil的配置类,配置一颗InfluxDBUtil的实例bean(单例)。在Spring框架中@Bean是一个方法级别的注解,会生成一个Spring管理的bean实例,该bean实例的id为方法名。上面代码中的配置就相当于以下的xml配置:
<beans > <bean id ="influxDBUtil" class ="cn.ist.msplatform.logservice.util.InfluxDBUtil" /> </beans >
3. Log.java @Data @Measurement (name = "log" )public class Log { @Column (name = "time" ) @JsonSerialize (using = InstantSerializer.class) private Instant time; @Column (name = "content" ) private String content; @Column (name = "datasourceName" ) private String datasourceName; @Column (name = "serviceName" ) private String serviceName; @Column (name = "apiPath" ) private String apiPath; @Column (name = "apiMethod" ) private String apiMethod; @Column (name = "type" , tag = true ) private String type; @Column (name = "processId" , tag = true ) private String processId; @Column (name = "datasourceId" , tag = true ) private String datasourceId; @Column (name = "serviceId" , tag = true ) private String serviceId; @Column (name = "apiId" , tag = true ) private String apiId; public Map<String, String> tags () { Map<String, String> tags = new HashMap<>(5 ); tags.put("type" , Optional.ofNullable(this .getType()).orElse("" )); tags.put("processId" , Optional.ofNullable(this .getProcessId()).orElse("" )); tags.put("datasourceId" , Optional.ofNullable(this .getDatasourceId()).orElse("" )); tags.put("serviceId" , Optional.ofNullable(this .getServiceId()).orElse("" )); tags.put("apiId" , Optional.ofNullable(this .getApiId()).orElse("" )); return tags; } public Map<String, Object> fields () { Map<String, Object> fields = new HashMap<>( 4 ); fields.put("content" , this .content); fields.put("datasourceName" , this .datasourceName); fields.put("serviceName" , this .serviceName); fields.put("apiPath" , this .apiPath); fields.put("apiMethod" , this .apiMethod); return fields; } }
Model层,Log的所有属性从上到下分为time、fields、tags三类。
4. LogRepository.java @Repository public class LogRepository { @Autowired private InfluxDBUtil influxDBUtil; private InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); public Log save (Log log) { influxDBUtil.insert("log" , log.tags(), log.fields()); return log; } public List<Log> findAll () { String command = "select * from log" ; return resultMapper.toPOJO(influxDBUtil.query(command), Log.class); } public List<Log> findServiceLogLimit (String serviceId, int num) { Object[] args = new Object[]{serviceId, num}; String command = String.format("select * from log where serviceId='%s' and processId='' order by time desc limit %d" , args); return resultMapper.toPOJO(influxDBUtil.query(command), Log.class); } public List<Log> findServiceApiLogLimit (String serviceId, String apiId, int num) { Object[] args = new Object[]{serviceId, apiId, num}; String command = String.format("select * from log where serviceId='%s' and apiId='%s' and processId='' order by time desc limit %d" , args); return resultMapper.toPOJO(influxDBUtil.query(command), Log.class); } public List<Log> findDatasourceLogLimit (String datasourceId, int num) { Object[] args = new Object[]{datasourceId, num}; String command = String.format("select * from log where datasourceId='%s' and processId='' order by time desc limit %d" , args); return resultMapper.toPOJO(influxDBUtil.query(command), Log.class); } public List<ServiceLogNumPerDayDTO> findServiceLogNumPerDay (String start, String end, String serviceId) { Object[] args = new Object[]{start, end, serviceId}; String command = String.format("select count(*) from log where time >= '%s' and time < '%s' and serviceId = '%s' group by time(1d) tz('Asia/Shanghai')" , args); return resultMapper.toPOJO(influxDBUtil.query(command), ServiceLogNumPerDayDTO.class); } }
Repository层,代码第4行把InfluxDBConfig.java中配置的单例influxDBUtil注入进来(注意@Bean和@Service、@Controller等都是默认配置单例的,IOC容器检测到InfluxDBUtil.class就只有一个单例,因此就自动注入了进来)。从数据到实体的映射使用InfluxDBResultMapper接口实现。
5. LogService.java @Service public class LogService { @Autowired SendService sendService; @Autowired private LogRepository logRepository; @Value ("${detect.datasource.statusNum}" ) private Integer datasourceStatusNum; @Value ("${detect.service.statusNum}" ) private Integer serviceStatusNum; public Log createLog (Log log) { if (log.getProcessId() != null ) { sendService.send(log); } return logRepository.save(log); } public List<Log> createLogs (List<Log> logs) { List<Log> result = new ArrayList<>(); for (Log log : logs) { result.add(createLog(log)); } return result; } public List<Log> findAll () { return logRepository.findAll(); } public List<Log> getRecentServiceLog (String serviceId, String apiId, Integer num) { if (apiId == null ) { return logRepository.findServiceLogLimit(serviceId, num); } else { return logRepository.findServiceApiLogLimit(serviceId, apiId, num); } } public List<Log> getRecentDatasourceLog (String datasourceId, Integer num) { return logRepository.findDatasourceLogLimit(datasourceId,num); } public List<LogStatusDTO> getDatasourceStatus (String id) { List<Log> recentLogs = logRepository.findDatasourceLogLimit(id,datasourceStatusNum); List<LogStatusDTO> status = new ArrayList<>(); for (Log recentLog : recentLogs) { status.add(LogStatusDTO.builder().timestamp(recentLog.getTime()) .content(recentLog.getContent()).build()); } return status; } public List<LogStatusDTO> getServiceStatus (String serviceId, String apiId) { List<Log> recentLogs = logRepository.findServiceApiLogLimit(serviceId, apiId, serviceStatusNum); List<LogStatusDTO> status = new ArrayList<>(); for (Log recentLog : recentLogs) { status.add(LogStatusDTO.builder().timestamp(recentLog.getTime()) .content(recentLog.getContent()).build()); } return status; } public List<ServiceLogNumPerDayDTO> findServiceLogNumPerDay (String start, String end, String serviceId) { return logRepository.findServiceLogNumPerDay(start, end, serviceId); } }
Service层,没什么好说的。
6. LogController.java @RestController public class LogController { @Autowired private LogService logService; @PostMapping (value = "/log" ) @ApiOperation (value = "创建日志" ) public Result<Log> createLog (@RequestBody Log log) { return ResultUtil.success(logService.createLog(log)); } @PostMapping (value = "/logs" ) @ApiOperation (value = "批量创建日志" ) public Result<List<Log>> createLogs(@RequestBody List<Log> logs) { return ResultUtil.success(logService.createLogs(logs)); } @GetMapping (value = "/log" ) @ApiOperation (value = "查找所有日志" ) public Result<List<Log>> findAll() { return ResultUtil.success(logService.findAll()); } @GetMapping (value = "/log/recent/serviceLog" ) @ApiOperation (value = "查找指定服务的最近若干条日志" ) public Result<List<Log>> findRecentServiceLog(@RequestParam (name = "serviceId" ) String serviceId, @RequestParam (name = "apiId" ,required=false ) String apiId, @RequestParam (name="num" ,required = false , defaultValue = "5" ) Integer num) { return ResultUtil.success(logService.getRecentServiceLog(serviceId, apiId, num)); } @GetMapping (value = "/log/recent/datasourceLog" ) @ApiOperation (value = "查找指定数据源的最近若干条日志" ) public Result<List<Log>> findRecentDatasourceLog(@RequestParam (name = "datasourceId" ) String datasourceId, @RequestParam (name = "num" ,required = false , defaultValue = "5" ) Integer num) { return ResultUtil.success(logService.getRecentDatasourceLog(datasourceId, num)); } @GetMapping (value = "/datasource/{id}/status" ) @ApiOperation (value = "监控指定数据源的最近状态" ) public Result<List<LogStatusDTO>> getDatasourceStatus(@PathVariable ("id" ) String id) { return ResultUtil.success(logService.getDatasourceStatus(id)); } @GetMapping (value = "/service/{serviceId}/{apiId}/status" ) @ApiOperation (value = "监控指定服务的最近状态" ) public Result<List<LogStatusDTO>> getServiceStatus(@PathVariable ("serviceId" ) String serviceId, @PathVariable ("apiId" ) String apiId) { return ResultUtil.success(logService.getServiceStatus(serviceId,apiId)); } @GetMapping (value="/log/timestamp" ) @ApiOperation (value = "在指定时间内,按每天特定服务的日志数量" ) public Result<List<ServiceLogNumPerDayDTO>> findServiceLogNumPerDay(@RequestParam (name = "start" ) Date start, @RequestParam (name = "end" ) Date end, @RequestParam (name = "serviceId" ) String serviceId){ Calendar calendar = new GregorianCalendar(); calendar.setTime(end); calendar.add(Calendar.DATE, 1 ); end = calendar.getTime(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'00:00:00+08:00" ); return ResultUtil.success(logService.findServiceLogNumPerDay(sdf.format(start), sdf.format(end), serviceId)); } }
Controller层,没什么好说的。