Spring Boot+InfluxDB实现日志管理

在实验室航天微服务项目的开发中,需要引入时序数据库进行日志数据的存取,经过事先的时序数据库调研最终选择采用InfluxDB。本文记录一下使用Spring Boot+InfluxDB实现日志管理的具体细节。

一、需求背景

在考虑引入InfluxDB之前,整个项目都使用MongoDB进行数据存储,但是考虑到日志数据相对于业务数据的特点,最终选择了更适合的InfluxDB用来专门存放日志数据。项目是基于微服务和Spring Boot框架构建的,由于在Spring Boot的框架中,InfluxDB不存在类似MongoDB的spring-boot-starter-data-mongodb库,因此不能很方便地对Spring Boot自动配置数据库,也没有像MongoDB那样很方便的接口(比如MongoRepository等)可用。既然指望不上Spring Boot框架,我采用的是InfluxDB的Java客户端库influxdb-java

二、源代码

1. InfluxDBUtil.java

@Data
@Slf4j
public class InfluxDBUtil {
private String url;
private String database;
private InfluxDB influxDB;

public InfluxDBUtil(String url, String database) {
this.url = url;
this.database = database;
this.influxDB = buildInfluxDB();
}

private InfluxDB buildInfluxDB() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(url);
}
try {
if (!influxDB.databaseExists(database)) {
influxDB.createDatabase(database);
}
influxDB.setDatabase(database);
} catch (Exception e) {
log.error("create influx db failed, error: {}", e.getMessage());
}
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
return influxDB;
}

public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}

public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {
Point point = Point.measurement(measurement)
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag(tags)
.fields(fields)
.build();
influxDB.write(point);
}
}

这是一个工具类,提供InfluxDB实例的创建以及插入和查询接口。

2. InfluxDBConfig.java

@Component
public class InfluxDBConfig {
@Value("${spring.influx.url}")
private String url;

@Value("${spring.influx.database}")
private String database;


@Bean
public InfluxDBUtil influxDBUtil() {
return new InfluxDBUtil(url, database);
}

}

InfluxDBUtil的配置类,配置一颗InfluxDBUtil的实例bean(单例)。在Spring框架中@Bean是一个方法级别的注解,会生成一个Spring管理的bean实例,该bean实例的id为方法名。上面代码中的配置就相当于以下的xml配置:

<beans>
<bean id="influxDBUtil" class="cn.ist.msplatform.logservice.util.InfluxDBUtil"/>
</beans>

3. Log.java

@Data
@Measurement(name = "log")
public class Log {
@Column(name = "time")
@JsonSerialize(using = InstantSerializer.class)
private Instant time;

@Column(name = "content")
private String content;
@Column(name = "datasourceName")
private String datasourceName;
@Column(name = "serviceName")
private String serviceName;
@Column(name = "apiPath")
private String apiPath;
@Column(name = "apiMethod")
private String apiMethod;

@Column(name = "type", tag = true)
private String type;
@Column(name = "processId", tag = true)
private String processId;
@Column(name = "datasourceId", tag = true)
private String datasourceId;
@Column(name = "serviceId", tag = true)
private String serviceId;
@Column(name = "apiId", tag = true)
private String apiId;

public Map<String, String> tags() {
Map<String, String> tags = new HashMap<>(5);
tags.put("type", Optional.ofNullable(this.getType()).orElse(""));
tags.put("processId", Optional.ofNullable(this.getProcessId()).orElse(""));
tags.put("datasourceId", Optional.ofNullable(this.getDatasourceId()).orElse(""));
tags.put("serviceId", Optional.ofNullable(this.getServiceId()).orElse(""));
tags.put("apiId", Optional.ofNullable(this.getApiId()).orElse(""));
return tags;
}

public Map<String, Object> fields() {
Map<String, Object> fields = new HashMap<>( 4);
fields.put("content", this.content);
fields.put("datasourceName", this.datasourceName);
fields.put("serviceName", this.serviceName);
fields.put("apiPath", this.apiPath);
fields.put("apiMethod", this.apiMethod);
return fields;
}
}

Model层,Log的所有属性从上到下分为time、fields、tags三类。

4. LogRepository.java

@Repository
public class LogRepository {
@Autowired
private InfluxDBUtil influxDBUtil;

private InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();

public Log save(Log log) {
influxDBUtil.insert("log", log.tags(), log.fields());
return log;
}

public List<Log> findAll() {
String command = "select * from log";
return resultMapper.toPOJO(influxDBUtil.query(command), Log.class);
}

public List<Log> findServiceLogLimit(String serviceId, int num) {
Object[] args = new Object[]{serviceId, num};
String command = String.format("select * from log where serviceId='%s' and processId='' order by time desc limit %d", args);
return resultMapper.toPOJO(influxDBUtil.query(command), Log.class);
}

public List<Log> findServiceApiLogLimit(String serviceId, String apiId, int num) {
Object[] args = new Object[]{serviceId, apiId, num};
String command = String.format("select * from log where serviceId='%s' and apiId='%s' and processId='' order by time desc limit %d", args);
return resultMapper.toPOJO(influxDBUtil.query(command), Log.class);
}

public List<Log> findDatasourceLogLimit(String datasourceId, int num) {
Object[] args = new Object[]{datasourceId, num};
String command = String.format("select * from log where datasourceId='%s' and processId='' order by time desc limit %d", args);
return resultMapper.toPOJO(influxDBUtil.query(command), Log.class);
}

public List<ServiceLogNumPerDayDTO> findServiceLogNumPerDay(String start, String end, String serviceId) {
Object[] args = new Object[]{start, end, serviceId};
String command = String.format("select count(*) from log where time >= '%s' and time < '%s' and serviceId = '%s' group by time(1d) tz('Asia/Shanghai')", args);
return resultMapper.toPOJO(influxDBUtil.query(command), ServiceLogNumPerDayDTO.class);
}

}

Repository层,代码第4行把InfluxDBConfig.java中配置的单例influxDBUtil注入进来(注意@Bean和@Service、@Controller等都是默认配置单例的,IOC容器检测到InfluxDBUtil.class就只有一个单例,因此就自动注入了进来)。从数据到实体的映射使用InfluxDBResultMapper接口实现。

5. LogService.java

@Service
public class LogService {
@Autowired
SendService sendService;

@Autowired
private LogRepository logRepository;

@Value("${detect.datasource.statusNum}")
private Integer datasourceStatusNum;

@Value("${detect.service.statusNum}")
private Integer serviceStatusNum;

public Log createLog(Log log) {
if (log.getProcessId() != null) {
sendService.send(log);
}
return logRepository.save(log);
}

public List<Log> createLogs(List<Log> logs) {
List<Log> result = new ArrayList<>();
for (Log log : logs) {
result.add(createLog(log));
}
return result;
}

public List<Log> findAll() {
return logRepository.findAll();
}

public List<Log> getRecentServiceLog(String serviceId, String apiId, Integer num) {
if (apiId == null) {
return logRepository.findServiceLogLimit(serviceId, num);
} else {
return logRepository.findServiceApiLogLimit(serviceId, apiId, num);
}
}

public List<Log> getRecentDatasourceLog(String datasourceId, Integer num) {
return logRepository.findDatasourceLogLimit(datasourceId,num);
}

public List<LogStatusDTO> getDatasourceStatus(String id) {
List<Log> recentLogs = logRepository.findDatasourceLogLimit(id,datasourceStatusNum);

List<LogStatusDTO> status = new ArrayList<>();
for (Log recentLog : recentLogs) {
status.add(LogStatusDTO.builder().timestamp(recentLog.getTime())
.content(recentLog.getContent()).build());
}
return status;
}

public List<LogStatusDTO> getServiceStatus(String serviceId, String apiId) {
List<Log> recentLogs = logRepository.findServiceApiLogLimit(serviceId, apiId, serviceStatusNum);

List<LogStatusDTO> status = new ArrayList<>();
for (Log recentLog : recentLogs) {
status.add(LogStatusDTO.builder().timestamp(recentLog.getTime())
.content(recentLog.getContent()).build());
}
return status;
}

public List<ServiceLogNumPerDayDTO> findServiceLogNumPerDay(String start, String end, String serviceId){
return logRepository.findServiceLogNumPerDay(start, end, serviceId);
}
}

Service层,没什么好说的。

6. LogController.java

@RestController
public class LogController {
@Autowired
private LogService logService;

@PostMapping(value = "/log")
@ApiOperation(value = "创建日志")
public Result<Log> createLog(@RequestBody Log log) {
return ResultUtil.success(logService.createLog(log));
}

@PostMapping(value = "/logs")
@ApiOperation(value = "批量创建日志")
public Result<List<Log>> createLogs(@RequestBody List<Log> logs) {
return ResultUtil.success(logService.createLogs(logs));
}


@GetMapping(value = "/log")
@ApiOperation(value = "查找所有日志")
public Result<List<Log>> findAll() {
return ResultUtil.success(logService.findAll());
}

@GetMapping(value = "/log/recent/serviceLog")
@ApiOperation(value = "查找指定服务的最近若干条日志")
public Result<List<Log>> findRecentServiceLog(@RequestParam(name = "serviceId") String serviceId,
@RequestParam(name = "apiId",required=false) String apiId,
@RequestParam(name="num",required = false, defaultValue = "5") Integer num) {
return ResultUtil.success(logService.getRecentServiceLog(serviceId, apiId, num));
}

@GetMapping(value = "/log/recent/datasourceLog")
@ApiOperation(value = "查找指定数据源的最近若干条日志")
public Result<List<Log>> findRecentDatasourceLog(@RequestParam(name = "datasourceId") String datasourceId,
@RequestParam(name = "num",required = false, defaultValue = "5") Integer num) {
return ResultUtil.success(logService.getRecentDatasourceLog(datasourceId, num));
}

@GetMapping(value = "/datasource/{id}/status")
@ApiOperation(value = "监控指定数据源的最近状态")
public Result<List<LogStatusDTO>> getDatasourceStatus(@PathVariable("id") String id) {
return ResultUtil.success(logService.getDatasourceStatus(id));
}

@GetMapping(value = "/service/{serviceId}/{apiId}/status")
@ApiOperation(value = "监控指定服务的最近状态")
public Result<List<LogStatusDTO>> getServiceStatus(@PathVariable("serviceId") String serviceId,
@PathVariable("apiId") String apiId) {
return ResultUtil.success(logService.getServiceStatus(serviceId,apiId));
}

@GetMapping(value="/log/timestamp")
@ApiOperation(value = "在指定时间内,按每天特定服务的日志数量")
public Result<List<ServiceLogNumPerDayDTO>> findServiceLogNumPerDay(@RequestParam(name = "start") Date start,
@RequestParam(name = "end") Date end,
@RequestParam(name = "serviceId") String serviceId){
Calendar calendar = new GregorianCalendar();
calendar.setTime(end);
calendar.add(Calendar.DATE, 1);
end = calendar.getTime();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'00:00:00+08:00");
return ResultUtil.success(logService.findServiceLogNumPerDay(sdf.format(start), sdf.format(end), serviceId));
}

}

Controller层,没什么好说的。

文章作者: Moon Lou
文章链接: https://loumoon.github.io/2020/08/07/SpringBoot+InfluxDB实现日志管理/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Moon's Blog