在实验室航天微服务项目的开发中,需要引入时序数据库进行日志数据的存取,经过事先的时序数据库调研 最终选择采用InfluxDB。本文记录一下使用Spring Boot+InfluxDB实现日志管理的具体细节。
 
 
一、需求背景 在考虑引入InfluxDB之前,整个项目都使用MongoDB进行数据存储,但是考虑到日志数据相对于业务数据的特点,最终选择了更适合的InfluxDB用来专门存放日志数据。项目是基于微服务和Spring Boot框架构建的,由于在Spring Boot的框架中,InfluxDB不存在类似MongoDB的spring-boot-starter-data-mongodb库,因此不能很方便地对Spring Boot自动配置数据库,也没有像MongoDB那样很方便的接口(比如MongoRepository等)可用。既然指望不上Spring Boot框架,我采用的是InfluxDB的Java客户端库influxdb-java 。
二、源代码 1. InfluxDBUtil.java @Data @Slf 4jpublic  class  InfluxDBUtil   {    private  String url;     private  String database;     private  InfluxDB influxDB;     public  InfluxDBUtil (String url, String database)   {         this .url = url;         this .database = database;         this .influxDB = buildInfluxDB();     }     private  InfluxDB buildInfluxDB ()   {         if  (influxDB == null ) {             influxDB = InfluxDBFactory.connect(url);         }         try  {             if  (!influxDB.databaseExists(database)) {                 influxDB.createDatabase(database);             }             influxDB.setDatabase(database);         } catch  (Exception e) {             log.error("create influx db failed, error: {}" , e.getMessage());         }         influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);         return  influxDB;     }     public  QueryResult query (String command)   {         return  influxDB.query(new  Query(command, database));     }     public  void  insert (String measurement, Map<String, String> tags, Map<String, Object> fields)   {         Point point = Point.measurement(measurement)                 .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)                 .tag(tags)                 .fields(fields)                 .build();         influxDB.write(point);     } } 
 
这是一个工具类,提供InfluxDB实例的创建以及插入和查询接口。
2. InfluxDBConfig.java @Component public  class  InfluxDBConfig   {    @Value ("${spring.influx.url}" )     private  String url;     @Value ("${spring.influx.database}" )     private  String database;     @Bean      public  InfluxDBUtil influxDBUtil ()   {         return  new  InfluxDBUtil(url, database);     } } 
 
InfluxDBUtil的配置类,配置一颗InfluxDBUtil的实例bean(单例)。在Spring框架中@Bean是一个方法级别的注解,会生成一个Spring管理的bean实例,该bean实例的id为方法名。上面代码中的配置就相当于以下的xml配置:
<beans >     <bean  id ="influxDBUtil"  class ="cn.ist.msplatform.logservice.util.InfluxDBUtil" />  </beans > 
 
3. Log.java @Data @Measurement (name = "log" )public  class  Log   {    @Column (name = "time" )     @JsonSerialize (using = InstantSerializer.class)     private  Instant time;     @Column (name = "content" )     private  String content;     @Column (name = "datasourceName" )     private  String datasourceName;     @Column (name = "serviceName" )     private  String serviceName;     @Column (name = "apiPath" )     private  String apiPath;     @Column (name = "apiMethod" )     private  String apiMethod;     @Column (name = "type" , tag = true )     private  String type;     @Column (name = "processId" , tag = true )     private  String processId;     @Column (name = "datasourceId" , tag = true )     private  String datasourceId;     @Column (name = "serviceId" , tag = true )     private  String serviceId;     @Column (name = "apiId" , tag = true )     private  String apiId;     public  Map<String, String> tags ()   {         Map<String, String> tags = new  HashMap<>(5 );         tags.put("type" , Optional.ofNullable(this .getType()).orElse("" ));         tags.put("processId" , Optional.ofNullable(this .getProcessId()).orElse("" ));         tags.put("datasourceId" , Optional.ofNullable(this .getDatasourceId()).orElse("" ));         tags.put("serviceId" , Optional.ofNullable(this .getServiceId()).orElse("" ));         tags.put("apiId" , Optional.ofNullable(this .getApiId()).orElse("" ));         return  tags;     }     public  Map<String, Object> fields ()   {         Map<String, Object> fields = new  HashMap<>( 4 );         fields.put("content" , this .content);         fields.put("datasourceName" , this .datasourceName);         fields.put("serviceName" , this .serviceName);         fields.put("apiPath" , this .apiPath);         fields.put("apiMethod" , this .apiMethod);         return  fields;     } } 
 
Model层,Log的所有属性从上到下分为time、fields、tags三类。
4. LogRepository.java @Repository public  class  LogRepository   {    @Autowired      private  InfluxDBUtil influxDBUtil;     private  InfluxDBResultMapper resultMapper = new  InfluxDBResultMapper();     public  Log save (Log log)   {         influxDBUtil.insert("log" , log.tags(), log.fields());         return  log;     }     public  List<Log> findAll ()   {         String command = "select * from log" ;         return  resultMapper.toPOJO(influxDBUtil.query(command), Log.class);     }     public  List<Log> findServiceLogLimit (String serviceId, int  num)   {         Object[] args = new  Object[]{serviceId, num};         String command = String.format("select * from log where serviceId='%s' and processId='' order by time desc limit %d" , args);         return  resultMapper.toPOJO(influxDBUtil.query(command), Log.class);     }     public  List<Log> findServiceApiLogLimit (String serviceId, String apiId, int  num)   {         Object[] args = new  Object[]{serviceId, apiId, num};         String command = String.format("select * from log where serviceId='%s' and apiId='%s' and processId='' order by time desc limit %d" , args);         return  resultMapper.toPOJO(influxDBUtil.query(command), Log.class);     }     public  List<Log> findDatasourceLogLimit (String datasourceId, int  num)   {         Object[] args = new  Object[]{datasourceId, num};         String command = String.format("select * from log where datasourceId='%s' and processId='' order by time desc limit %d" , args);         return  resultMapper.toPOJO(influxDBUtil.query(command), Log.class);     }     public  List<ServiceLogNumPerDayDTO> findServiceLogNumPerDay (String start, String end, String serviceId)   {         Object[] args = new  Object[]{start, end, serviceId};         String command = String.format("select count(*) from log where time >= '%s' and time < '%s' and serviceId = '%s' group by time(1d) tz('Asia/Shanghai')" , args);         return  resultMapper.toPOJO(influxDBUtil.query(command), ServiceLogNumPerDayDTO.class);     } } 
 
Repository层,代码第4行把InfluxDBConfig.java中配置的单例influxDBUtil注入进来(注意@Bean和@Service、@Controller等都是默认配置单例的,IOC容器检测到InfluxDBUtil.class就只有一个单例,因此就自动注入了进来)。从数据到实体的映射使用InfluxDBResultMapper接口实现。
5. LogService.java @Service public  class  LogService   {    @Autowired      SendService sendService;     @Autowired      private  LogRepository logRepository;     @Value ("${detect.datasource.statusNum}" )     private  Integer datasourceStatusNum;     @Value ("${detect.service.statusNum}" )     private  Integer serviceStatusNum;     public  Log createLog (Log log)   {         if  (log.getProcessId() != null ) {             sendService.send(log);         }         return  logRepository.save(log);     }     public  List<Log> createLogs (List<Log> logs)   {         List<Log> result = new  ArrayList<>();         for  (Log log : logs) {             result.add(createLog(log));         }         return  result;     }     public  List<Log> findAll ()   {         return  logRepository.findAll();     }     public  List<Log> getRecentServiceLog (String serviceId, String apiId, Integer num)   {         if  (apiId == null ) {             return  logRepository.findServiceLogLimit(serviceId, num);         } else  {             return  logRepository.findServiceApiLogLimit(serviceId, apiId, num);         }     }     public  List<Log> getRecentDatasourceLog (String datasourceId, Integer num)   {         return  logRepository.findDatasourceLogLimit(datasourceId,num);     }     public  List<LogStatusDTO> getDatasourceStatus (String id)   {         List<Log> recentLogs = logRepository.findDatasourceLogLimit(id,datasourceStatusNum);         List<LogStatusDTO> status = new  ArrayList<>();         for  (Log recentLog : recentLogs) {             status.add(LogStatusDTO.builder().timestamp(recentLog.getTime())                     .content(recentLog.getContent()).build());         }         return  status;     }     public  List<LogStatusDTO> getServiceStatus (String serviceId, String apiId)   {         List<Log> recentLogs = logRepository.findServiceApiLogLimit(serviceId, apiId, serviceStatusNum);         List<LogStatusDTO> status = new  ArrayList<>();         for  (Log recentLog : recentLogs) {             status.add(LogStatusDTO.builder().timestamp(recentLog.getTime())                     .content(recentLog.getContent()).build());         }         return  status;     }     public  List<ServiceLogNumPerDayDTO> findServiceLogNumPerDay (String start, String end, String serviceId)  {         return  logRepository.findServiceLogNumPerDay(start, end, serviceId);     } } 
 
Service层,没什么好说的。
6. LogController.java @RestController public  class  LogController   {    @Autowired      private  LogService logService;     @PostMapping (value = "/log" )     @ApiOperation (value = "创建日志" )     public  Result<Log> createLog (@RequestBody Log log)   {         return  ResultUtil.success(logService.createLog(log));     }     @PostMapping (value = "/logs" )     @ApiOperation (value = "批量创建日志" )     public  Result<List<Log>> createLogs(@RequestBody  List<Log> logs) {         return  ResultUtil.success(logService.createLogs(logs));     }     @GetMapping (value = "/log" )     @ApiOperation (value = "查找所有日志" )     public  Result<List<Log>> findAll() {         return  ResultUtil.success(logService.findAll());     }     @GetMapping (value = "/log/recent/serviceLog" )     @ApiOperation (value = "查找指定服务的最近若干条日志" )     public  Result<List<Log>> findRecentServiceLog(@RequestParam (name = "serviceId" ) String serviceId,                                                   @RequestParam (name = "apiId" ,required=false ) String apiId,                                                   @RequestParam (name="num" ,required = false , defaultValue = "5" ) Integer num) {         return  ResultUtil.success(logService.getRecentServiceLog(serviceId, apiId, num));     }     @GetMapping (value = "/log/recent/datasourceLog" )     @ApiOperation (value = "查找指定数据源的最近若干条日志" )     public  Result<List<Log>> findRecentDatasourceLog(@RequestParam (name = "datasourceId" ) String datasourceId,                                                      @RequestParam (name = "num" ,required = false , defaultValue = "5" ) Integer num) {         return  ResultUtil.success(logService.getRecentDatasourceLog(datasourceId, num));     }     @GetMapping (value = "/datasource/{id}/status" )     @ApiOperation (value = "监控指定数据源的最近状态" )     public  Result<List<LogStatusDTO>> getDatasourceStatus(@PathVariable ("id" ) String id) {         return  ResultUtil.success(logService.getDatasourceStatus(id));     }     @GetMapping (value = "/service/{serviceId}/{apiId}/status" )     @ApiOperation (value = "监控指定服务的最近状态" )     public  Result<List<LogStatusDTO>> getServiceStatus(@PathVariable ("serviceId" ) String serviceId,                                                        @PathVariable ("apiId" ) String apiId) {         return  ResultUtil.success(logService.getServiceStatus(serviceId,apiId));     }     @GetMapping (value="/log/timestamp" )     @ApiOperation (value = "在指定时间内,按每天特定服务的日志数量" )     public  Result<List<ServiceLogNumPerDayDTO>> findServiceLogNumPerDay(@RequestParam (name = "start" ) Date start,                                                                         @RequestParam (name = "end" ) Date end,                                                                         @RequestParam (name = "serviceId" ) String serviceId){         Calendar calendar = new  GregorianCalendar();         calendar.setTime(end);         calendar.add(Calendar.DATE, 1 );         end = calendar.getTime();         SimpleDateFormat sdf = new  SimpleDateFormat("yyyy-MM-dd'T'00:00:00+08:00" );         return  ResultUtil.success(logService.findServiceLogNumPerDay(sdf.format(start), sdf.format(end), serviceId));     } } 
 
Controller层,没什么好说的。