cgo+ffmpeg+截图 的简单尝试


概述

首先这里的截图是针对视频流而言的,通常是rtmp的流。

命令行截图:使用FFmpeg来进行固定时间间隔截图(rtmp直播流)

一般情况下,会通过go代码调用命令行(CMD)来使用ffmpeg进行截图,但是这样的问题在于每一个截图任务都是一个独立的进程, 如果要同时对几千路流进行截图,那么进程间的开销会直接占用大半的CPU资源。

使用命令行调用ffmpeg的另一个问题在于,由于是另外一个独立的进程在进行截图,所以go程序无法直接知道截图的情况,整个截图过程都是异步的。 go程序无法知道什么时候ffmpeg完成了一张截图。简单的方法只能定期扫描ffmpeg产生的list文件来判断是否有新的截图产生,就挺恶心的。

使用cgo+ffmpeg+截图的方式来解决上述问题,做到以下几点

  1. 每一个截图任务都运行在一个go程中,而不是进程。
  2. 一张截图完成时,可以直接进行回调。
  3. 可以实时控制截图任务的开始和结束。
  4. go代码与c代码使用同一个日志打印函数,日志将用同样的格式打印到同样的地方。
  5. 单个截图任务的CPU消耗基本与直接命令行截图相同。

这样的方式也会有一定坏处

  1. 会增加一定CPU开销。go与c之间的函数调用还是会带来一定额外开销的。
  2. 一个截图任务发生panic,如果不处理,会导致整个程序崩溃。

如何调用c函数来进行截图

首先,c没有类的概念,都是基于结构体和函数调用。

这里尝试了两种路线来调用c函数进行截图:


路线1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type CSnapshot struct {
pToCStruct *C.SnapShot // 指向 c 结构体的指针
}
// 初始化
func NewCSnapshot() *CSnapshot {
cs.pToCStruct = C.InitSnapshotStruct()
return cs
}
// 开始截图
func (cs *CSnapshot) Start() {
C.SnapShotStructRun(cs.pToCStruct)
}
// 停止截图
func (cs *CSnapshot) Stop() {
C.SnapShotStructStop(cs.pToCStruct)
}

首先进行结构体初始化,得到一个结构体指针,然后将指针传入RunStop函数来控制截图任务的开始与结束。


路线2:

1
2
3
4
5
6
7
8
9
func (cs *CSnapshot) Start() {
taskIdC := C.CString(cs.taskId)
ret := C.SnapShotStructRun(taskIdC) // 阻塞
}
func (cs *CSnapshot) Stop() {
taskIdC := C.CString(cs.taskId)
C.StopTaskForGo(taskIdC)
}

直接调用Run函数运行截图任务,传入一个taskId对任务进行标识,且这个函数会阻塞。 传入taskId到Stop函数来停止任务。

这个过程中,go内存中并不保存任何c中的变量。


在实际调试过程,发现第一种方式会出现奇怪的问题,结构体经过几次函数传递之后,struct内的string变成了乱码,貌似是char*指针地址直接变了。 这个问题最后也没看懂,应该是比较底层的问题。

后来就选择了第二种方式,虽然简陋,但是不会有问题。


c函数如何将截图回调给go

在使用cgo的时候,go也没有办法把一个成员函数直接传递给c来进行调用,所以简单的使用一个全局函数来给c进行调用:

1
2
3
4
5
6
//export SnapshotCallBackForC
func SnapshotCallBackForC(taskId, filePath string) {
// 用于c语言回调,貌似只能这么猥琐的来写了
// 截图成功则调用这个函数来通知go
fmt.Printf("taskId %s, filePath %s\n", taskId, filePath)
}

如何让c函数日志打印到go的日志函数中

统一日志打印,整个程序调试起来会简单很多。

首先go将日志函数export一下:

1
2
3
4
5
6
7
8
9
10
import "C"
//export InfoForC
func InfoForC(message string) {
if loglevel > LevelInfo {
return
}
logger.Info(message)
}

在c中进行一下包装,可以十分方便的直接使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define LogDebug(fmt, ...) GoDebug(__FILE__, __FUNCTION__, __LINE__, fmt, ##__VA_ARGS__)
#define LogInfo(fmt, ...) GoInfo(__FILE__, __FUNCTION__, __LINE__, fmt, ##__VA_ARGS__)
#define LogWarn(fmt, ...) GoWarn(__FILE__, __FUNCTION__, __LINE__, fmt, ##__VA_ARGS__)
#define LogError(fmt, ...) GoError(__FILE__, __FUNCTION__, __LINE__, fmt, ##__VA_ARGS__)
void GoInfo(const char* file, const char* func, int line, char* fmt, ...) {
int i, j;
char buf[MSG_MAX_LEN];
i = snprintf(buf, MSG_MAX_LEN, "[%s][%s:%d] ", file, func, line);
va_list arglist;
va_start(arglist, fmt);
j = vsnprintf(&buf[i], MSG_MAX_LEN - i, fmt, arglist);
va_end(arglist);
GoString go_str = { p: buf, n : i + j };
InfoForC(go_str);
}

这样就可以在c中使用go的日志打印函数,可以将文件名、函数名和行数都打印出来,对于调试十分友好。


go如何立即停止c中运行的截图任务

截图任务运行在另一个go程之中,这里通过简单的修改c的全局变量来改变截图任务的状态,让它在检测到时停止运行。

go函数调用c函数来修改c函数状态:

1
2
3
4
5
6
func (cs *CSnapshot) Stop() {
taskIdC := C.CString(cs.taskId)
defer C.free(unsafe.Pointer(taskIdC))
C.StopTaskForGo(taskIdC)
}

c函数将任务状态存入一个map之中:

1
2
3
4
5
6
7
8
9
10
void setTaskState(char* taskId, int state) {
pthread_mutex_lock(&taskMapMute);
map_set(&taskStateMap, taskId, state);
pthread_mutex_unlock(&taskMapMute);
}
void StopTaskForGo(char* taskId) {
setTaskState(taskId, STOP);
}

每解码一个frame,就去检测一下task的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int* getTaskState(char* taskId) {
pthread_mutex_lock(&taskMapMute);
int* val = map_get(&taskStateMap, taskId);
pthread_mutex_unlock(&taskMapMute);
return val;
}
int isTaskStateRunning(char* taskId) {
int* state = getTaskState(taskId);
if (state == NULL) {
return 0;
}
return *state == RUNNING;
}
while (av_read_frame(ss->ifmt_ctx, ss->dePkt) >= 0) {
if (!isTaskStateRunning(ss->taskId)) {
LogWarn("------------- task %s state is stopped, break the loop -------------", ss->taskId);
break;
}
...
}

如何防止av_read_frame出现阻塞

如何什么都不设置,那么一旦直播流没了,也就是av_read_frame获取不到更多的数据的时候,它就会阻塞在哪里, 如果这时候想要结束这个任务,上面的全局变量就没用了。

为此,得在结束任务的时候,让av_read_frame也跳出阻塞。这里可以使用interrupt_callback来达到这个目的。

在ffmpeg的AVFormatContext结构体中,存在一个AVIOInterruptCB结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typedef struct AVFormatContext {
/**
* Custom interrupt callbacks for the I/O layer.
*
* demuxing: set by the user before avformat_open_input().
* muxing: set by the user before avformat_write_header()
* (mainly useful for AVFMT_NOFILE formats). The callback
* should also be passed to avio_open2() if it's used to
* open the file.
*/
AVIOInterruptCB interrupt_callback;
}
/**
* Callback for checking whether to abort blocking functions.
* AVERROR_EXIT is returned in this case by the interrupted
* function. During blocking operations, callback is called with
* opaque as parameter. If the callback returns 1, the
* blocking operation will be aborted.
*
* No members can be added to this struct without a major bump, if
* new elements have been added after this struct in AVFormatContext
* or AVIOContext.
*/
typedef struct AVIOInterruptCB {
int (*callback)(void*);
void *opaque;
} AVIOInterruptCB;

所以这里设置一个回调函数,在任务状态为Stop的时候返回1即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int isTaskStateRunning(char* taskId) {
int* state = getTaskState(taskId);
if (state == NULL) {
return 0;
}
return *state == RUNNING;
}
int interruptCallBack(void* taskId) {
if (isTaskStateRunning((char*)taskId)) {
return 0;
}
return 1;
}
ss->ifmt_ctx = avformat_alloc_context();
ss->ifmt_ctx->interrupt_callback.callback = interruptCallBack;
ss->ifmt_ctx->interrupt_callback.opaque = ss->taskId;

通过上面的方法,就可以通过go立即停止正在运行的c截图任务。


如何自定义ffmpeg的打印函数

直接可以通过av_log_set_callback函数来自定义日志函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void initFFLogCallBack() {
pthread_mutex_lock(&ffLogInitMute);
if (!ffLogInit) {
ffLogInit = 1;
av_log_set_callback(LogForFF);
}
pthread_mutex_unlock(&ffLogInitMute);
}
typedef enum FF_LOG_LEVEL {
FF_ERROR,
FF_WARN,
FF_INFO,
FF_DEBUG,
} FF_LOG_LEVEL;
void SetFFLogLevel(enum FF_LOG_LEVEL level) {
ff_log_level = level;
}
void LogForFF(void* ptr, int level, const char* fmt, va_list vl) {
if (level >= AV_LOG_DEBUG) {
if (ff_log_level >= FF_DEBUG) {
LogDebug(fmt, vl);
}
}
else if (level >= AV_LOG_INFO) {
if (ff_log_level >= FF_INFO) {
LogInfo(fmt, vl);
}
}
else if (level >= AV_LOG_WARNING) {
if (ff_log_level >= FF_WARN) {
LogWarn(fmt, vl);
}
}
else if (level >= AV_LOG_QUIET) {
if (ff_log_level >= FF_ERROR) {
LogError(fmt, vl);
}
}
else {
LogError(fmt, vl);
}
}

虽然这样可以使得ffmpeg也使用go的日志函数来进行打印,但是通常情况下ffmpeg的日志实在太多了,根本没法看,所以不太建议使用。


如何优化cgo的运行效率

在写这个代码的过程中,经过反复测试,发现要尽量减少go和c之间的相互调用

最开始,任务状态检测时,之间让c调用go的函数来进行检测,这样每一次收到一个packet,就会调用一下go的函数。 结果CPU使用率增加了近一倍多(相比于直接纯c语言截图)。

后来就改成了上面的方式,c中使用一个全局的map来存储任务的状态,每一次收到一个packet,直接去检测这个全局变量,而不是调用go函数, CPU使用率就回到了正常的水平。


---------------------------------END---------------------------------