FFmpeg
f_zmq.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Stefano Sabatini
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 /**
22  * @file
23  * receive commands through libzeromq and broker them to filters
24  */
25 
26 #include <zmq.h>
27 #include "libavutil/avstring.h"
28 #include "libavutil/bprint.h"
29 #include "libavutil/opt.h"
30 #include "avfilter.h"
31 #include "internal.h"
32 #include "audio.h"
33 #include "video.h"
34 
35 typedef struct ZMQContext {
36  const AVClass *class;
37  void *zmq;
38  void *responder;
39  char *bind_address;
41 } ZMQContext;
42 
43 #define OFFSET(x) offsetof(ZMQContext, x)
44 #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
45 static const AVOption options[] = {
46  { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
47  { "b", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
48  { NULL }
49 };
50 
52 {
53  ZMQContext *zmq = ctx->priv;
54 
55  zmq->zmq = zmq_ctx_new();
56  if (!zmq->zmq) {
58  "Could not create ZMQ context: %s\n", zmq_strerror(errno));
59  return AVERROR_EXTERNAL;
60  }
61 
62  zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
63  if (!zmq->responder) {
65  "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
66  return AVERROR_EXTERNAL;
67  }
68 
69  if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
71  "Could not bind ZMQ socket to address '%s': %s\n",
72  zmq->bind_address, zmq_strerror(errno));
73  return AVERROR_EXTERNAL;
74  }
75 
76  zmq->command_count = -1;
77  return 0;
78 }
79 
81 {
82  ZMQContext *zmq = ctx->priv;
83 
84  zmq_close(zmq->responder);
85  zmq_ctx_destroy(zmq->zmq);
86 }
87 
88 typedef struct Command {
89  char *target, *command, *arg;
90 } Command;
91 
92 #define SPACES " \f\t\n\r"
93 
94 static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
95 {
96  const char **buf = &command_str;
97 
98  cmd->target = av_get_token(buf, SPACES);
99  if (!cmd->target || !cmd->target[0]) {
100  av_log(log_ctx, AV_LOG_ERROR,
101  "No target specified in command '%s'\n", command_str);
102  return AVERROR(EINVAL);
103  }
104 
105  cmd->command = av_get_token(buf, SPACES);
106  if (!cmd->command || !cmd->command[0]) {
107  av_log(log_ctx, AV_LOG_ERROR,
108  "No command specified in command '%s'\n", command_str);
109  return AVERROR(EINVAL);
110  }
111 
112  cmd->arg = av_get_token(buf, SPACES);
113  return 0;
114 }
115 
116 static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
117 {
118  ZMQContext *zmq = ctx->priv;
119  zmq_msg_t msg;
120  int ret = 0;
121 
122  if (zmq_msg_init(&msg) == -1) {
124  "Could not initialize receive message: %s\n", zmq_strerror(errno));
125  return AVERROR_EXTERNAL;
126  }
127 
128  if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
129  if (errno != EAGAIN)
131  "Could not receive message: %s\n", zmq_strerror(errno));
133  goto end;
134  }
135 
136  *buf_size = zmq_msg_size(&msg) + 1;
137  *buf = av_malloc(*buf_size);
138  if (!*buf) {
139  ret = AVERROR(ENOMEM);
140  goto end;
141  }
142  memcpy(*buf, zmq_msg_data(&msg), *buf_size - 1);
143  (*buf)[*buf_size-1] = 0;
144 
145 end:
146  zmq_msg_close(&msg);
147  return ret;
148 }
149 
151 {
152  AVFilterContext *ctx = inlink->dst;
153  ZMQContext *zmq = ctx->priv;
154 
155  while (1) {
156  char cmd_buf[1024];
157  char *recv_buf, *send_buf;
158  int recv_buf_size;
159  Command cmd = {0};
160  int ret;
161 
162  /* receive command */
163  if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
164  break;
165  zmq->command_count++;
166 
167  /* parse command */
168  if (parse_command(&cmd, recv_buf, ctx) < 0) {
169  av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
170  goto end;
171  }
172 
173  /* process command */
175  "Processing command #%d target:%s command:%s arg:%s\n",
176  zmq->command_count, cmd.target, cmd.command, cmd.arg);
178  cmd.target, cmd.command, cmd.arg,
179  cmd_buf, sizeof(cmd_buf),
181  send_buf = av_asprintf("%d %s%s%s",
182  -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
183  if (!send_buf) {
184  ret = AVERROR(ENOMEM);
185  goto end;
186  }
188  "Sending command reply for command #%d:\n%s\n",
189  zmq->command_count, send_buf);
190  if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
191  av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
192  zmq->command_count, zmq_strerror(ret));
193 
194  end:
195  av_freep(&send_buf);
196  av_freep(&recv_buf);
197  recv_buf_size = 0;
198  av_freep(&cmd.target);
199  av_freep(&cmd.command);
200  av_freep(&cmd.arg);
201  }
202 
203  return ff_filter_frame(ctx->outputs[0], ref);
204 }
205 
206 AVFILTER_DEFINE_CLASS_EXT(zmq, "(a)zmq", options);
207 
208 #if CONFIG_ZMQ_FILTER
209 
210 static const AVFilterPad zmq_inputs[] = {
211  {
212  .name = "default",
213  .type = AVMEDIA_TYPE_VIDEO,
214  .filter_frame = filter_frame,
215  },
216 };
217 
218 static const AVFilterPad zmq_outputs[] = {
219  {
220  .name = "default",
221  .type = AVMEDIA_TYPE_VIDEO,
222  },
223 };
224 
225 const AVFilter ff_vf_zmq = {
226  .name = "zmq",
227  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
228  .init = init,
229  .uninit = uninit,
230  .priv_size = sizeof(ZMQContext),
231  FILTER_INPUTS(zmq_inputs),
232  FILTER_OUTPUTS(zmq_outputs),
233  .priv_class = &zmq_class,
234 };
235 
236 #endif
237 
238 #if CONFIG_AZMQ_FILTER
239 
240 static const AVFilterPad azmq_inputs[] = {
241  {
242  .name = "default",
243  .type = AVMEDIA_TYPE_AUDIO,
244  .filter_frame = filter_frame,
245  },
246 };
247 
248 static const AVFilterPad azmq_outputs[] = {
249  {
250  .name = "default",
251  .type = AVMEDIA_TYPE_AUDIO,
252  },
253 };
254 
255 const AVFilter ff_af_azmq = {
256  .name = "azmq",
257  .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
258  .priv_class = &zmq_class,
259  .init = init,
260  .uninit = uninit,
261  .priv_size = sizeof(ZMQContext),
262  FILTER_INPUTS(azmq_inputs),
263  FILTER_OUTPUTS(azmq_outputs),
264 };
265 
266 #endif
AVFILTER_CMD_FLAG_ONE
#define AVFILTER_CMD_FLAG_ONE
Stop once a filter understood the command (for target=all for example), fast filters are favored auto...
Definition: avfilter.h:739
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:186
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
av_get_token
char * av_get_token(const char **buf, const char *term)
Unescape the given string until a non escaped terminating char, and return the token corresponding to...
Definition: avstring.c:151
ff_filter_frame
int ff_filter_frame(AVFilterLink *link, AVFrame *frame)
Send a frame of data to the next filter.
Definition: avfilter.c:1018
inlink
The exact code depends on how similar the blocks are and how related they are to the and needs to apply these operations to the correct inlink or outlink if there are several Macros are available to factor that when no extra processing is inlink
Definition: filter_design.txt:212
av_asprintf
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:113
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:317
AVOption
AVOption.
Definition: opt.h:247
options
static const AVOption options[]
Definition: f_zmq.c:45
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:196
ff_af_azmq
const AVFilter ff_af_azmq
AVFilter::name
const char * name
Filter name.
Definition: avfilter.h:169
ZMQContext
Definition: f_zmq.c:35
video.h
parse_command
static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
Definition: f_zmq.c:94
ZMQContext::responder
void * responder
Definition: f_zmq.c:38
av_malloc
#define av_malloc(s)
Definition: tableprint_vlc.h:31
Command::target
char * target
Definition: f_sendcmd.c:83
Command::command
char * command
Definition: f_sendcmd.c:83
AVFilterPad
A filter pad used for either input or output.
Definition: internal.h:50
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
av_cold
#define av_cold
Definition: attributes.h:90
AVMEDIA_TYPE_AUDIO
@ AVMEDIA_TYPE_AUDIO
Definition: avutil.h:202
filter_frame
static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
Definition: f_zmq.c:150
ctx
AVFormatContext * ctx
Definition: movenc.c:48
Command
Definition: f_sendcmd.c:81
FILTER_INPUTS
#define FILTER_INPUTS(array)
Definition: internal.h:191
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
NULL
#define NULL
Definition: coverity.c:32
uninit
static void av_cold uninit(AVFilterContext *ctx)
Definition: f_zmq.c:80
SPACES
#define SPACES
Definition: f_zmq.c:92
NULL_IF_CONFIG_SMALL
#define NULL_IF_CONFIG_SMALL(x)
Return NULL if CONFIG_SMALL is true, otherwise the argument without modification.
Definition: internal.h:117
ZMQContext::bind_address
char * bind_address
Definition: f_zmq.c:39
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:121
recv_msg
static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
Definition: f_zmq.c:116
AVERROR_EXTERNAL
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:59
internal.h
bprint.h
FLAGS
#define FLAGS
Definition: f_zmq.c:44
AVFilterPad::name
const char * name
Pad name.
Definition: internal.h:56
AVFilter
Filter definition.
Definition: avfilter.h:165
ret
ret
Definition: filter_design.txt:187
Command::arg
char * arg
Definition: f_sendcmd.c:83
ZMQContext::command_count
int command_count
Definition: f_zmq.c:40
avfilter.h
ref
static int ref[MAX_W *MAX_W]
Definition: jpeg2000dwt.c:107
ZMQContext::zmq
void * zmq
Definition: f_zmq.c:37
init
static av_cold int init(AVFilterContext *ctx)
Definition: f_zmq.c:51
AVFilterContext
An instance of a filter.
Definition: avfilter.h:402
AVMEDIA_TYPE_VIDEO
@ AVMEDIA_TYPE_VIDEO
Definition: avutil.h:201
audio.h
FILTER_OUTPUTS
#define FILTER_OUTPUTS(array)
Definition: internal.h:192
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
avfilter_graph_send_command
int avfilter_graph_send_command(AVFilterGraph *graph, const char *target, const char *cmd, const char *arg, char *res, int res_len, int flags)
Send a command to one or more filter instances.
Definition: avfiltergraph.c:1174
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:28
ff_vf_zmq
const AVFilter ff_vf_zmq
avstring.h
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Definition: opt.h:228
AVFILTER_DEFINE_CLASS_EXT
AVFILTER_DEFINE_CLASS_EXT(zmq, "(a)zmq", options)
OFFSET
#define OFFSET(x)
Definition: f_zmq.c:43