Basic Usage
Learn the core concepts and components of DeepStream to build sophisticated video analytics pipelines.
Pipeline Architecture
DeepStream pipelines are built using GStreamer elements. Understanding the pipeline structure is essential for building custom applications.
Basic Pipeline Structure
Source → Decode → Preprocess → Inference → Postprocess → Output
Multi-Stream Pipeline
Source 1 ──┐
Source 2 ──┤→ Mux → Inference → Tracking → Tiler → OSD → Sink
Source N ──┘
Core GStreamer Plugins
Input Plugins
filesrc: Read from file
source = Gst.ElementFactory.make("filesrc", "file-source")
source.set_property('location', '/path/to/video.mp4')
uridecodebin: Universal decoder for various sources
source = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
source.set_property('uri', 'file:///path/to/video.mp4')
# or
source.set_property('uri', 'rtsp://192.168.1.100:554/stream')
v4l2src: USB/V4L2 camera
source = Gst.ElementFactory.make("v4l2src", "usb-cam-source")
source.set_property('device', '/dev/video0')
DeepStream Plugins
nvstreammux: Stream multiplexer (batching)
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', 4)
streammux.set_property('batched-push-timeout', 4000000)
streammux.set_property('live-source', 1) # For RTSP/live sources
nvinfer: Primary or secondary inference
pgie = Gst.ElementFactory.make("nvinfer", "primary-nvinference-engine")
pgie.set_property('config-file-path', 'config_infer_primary.txt')
# For secondary inference
sgie = Gst.ElementFactory.make("nvinfer", "secondary-nvinference-engine")
sgie.set_property('config-file-path', 'config_infer_secondary.txt')
nvtracker: Object tracking
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
tracker.set_property('ll-lib-file', '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
tracker.set_property('ll-config-file', 'config_tracker.txt')
nvmultistreamtiler: Multi-stream tiler
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
tiler.set_property('rows', 2)
tiler.set_property('columns', 2)
tiler.set_property('width', 1920)
tiler.set_property('height', 1080)
nvvideoconvert: Color space conversion
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
nvdsosd: On-screen display
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
nvosd.set_property('process-mode', 1) # 0=CPU, 1=GPU, 2=VIC
nvosd.set_property('display-text', 1)
nvosd.set_property('display-bbox', 1)
nvosd.set_property('display-mask', 1)
Output Plugins
nveglglessink: Display on screen
sink = Gst.ElementFactory.make("nveglglessink", "video-renderer")
sink.set_property('sync', 0)
filesink: Save to file
sink = Gst.ElementFactory.make("filesink", "file-sink")
sink.set_property('location', 'output.mp4')
sink.set_property('sync', 0)
nvmsgbroker: Send metadata to message broker
msgbroker = Gst.ElementFactory.make("nvmsgbroker", "msgbroker")
msgbroker.set_property('proto-lib', '/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so')
msgbroker.set_property('config', 'kafka_config.txt')
msgbroker.set_property('topic', 'metadata')
Configuration Files
DeepStream uses configuration files for complex settings. Here are the main types:
Primary Inference Config
[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
custom-network-config=yolov4.cfg
model-file=yolov4.weights
model-engine-file=yolov4_b1_gpu0_fp32.engine
labelfile-path=labels.txt
batch-size=1
network-mode=0
num-detected-classes=80
interval=0
gie-unique-id=1
network-type=0
[class-attrs-all]
pre-cluster-threshold=0.25
eps=0.2
group-threshold=1
Tracker Config
[tracker]
tracker-width=640
tracker-height=384
ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
ll-config-file=config_tracker_NvDCF_perf.yml
enable-batch-process=1
enable-past-frame=1
Message Broker Config (Kafka)
[message-broker]
proto-lib=/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so
conn-str=localhost;9092
topic=deepstream-events
partition-key=sensor-id
[message-consumer]
enable=0
Metadata Management
Metadata is the core of DeepStream applications. It flows through the pipeline attached to each frame.
Metadata Hierarchy
NvDsBatchMeta
└─ NvDsFrameMeta (per frame)
├─ NvDsObjectMeta (per detected object)
│ ├─ NvDsClassifierMeta (per secondary classification)
│ └─ NvDsUserMeta (custom metadata)
└─ NvDsDisplayMeta (display metadata)
Accessing Metadata
def osd_sink_pad_buffer_probe(pad, info, u_data):
gst_buffer = info.get_buffer()
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
# Access frame-level info
frame_number = frame_meta.frame_num
source_id = frame_meta.source_id
num_objects = frame_meta.num_obj_meta
# Iterate through objects
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
# Object properties
obj_id = obj_meta.object_id
class_id = obj_meta.class_id
confidence = obj_meta.confidence
label = obj_meta.obj_label
# Bounding box
left = obj_meta.rect_params.left
top = obj_meta.rect_params.top
width = obj_meta.rect_params.width
height = obj_meta.rect_params.height
# Tracking info (if tracker is used)
if obj_meta.tracker_confidence > 0:
track_id = obj_meta.object_id
try:
l_obj = l_obj.next
except StopIteration:
break
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
Adding Custom Metadata
def add_custom_metadata(frame_meta, custom_data):
# Acquire user meta from pool
user_meta = pyds.nvds_acquire_user_meta_from_pool(frame_meta.base_meta.batch_meta)
if user_meta:
user_meta.user_meta_data = custom_data
user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_USER_META
# Set callback for cleanup
user_meta.base_meta.release_func = release_user_meta
user_meta.base_meta.copy_func = copy_user_meta
# Add to frame meta
pyds.nvds_add_user_meta_to_frame(frame_meta, user_meta)
def release_user_meta(data, user_data):
# Cleanup function
pass
def copy_user_meta(data, user_data):
# Copy function for metadata
return data
Building a Complete Pipeline
Here's a complete example combining multiple streams with tracking:
#!/usr/bin/env python3
import sys
sys.path.append('/opt/nvidia/deepstream/deepstream/lib')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
import pyds
# Source URIs
sources = [
"file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264",
"file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
]
def create_source_bin(index, uri):
"""Create source bin for each stream"""
bin_name = f"source-bin-{index}"
nbin = Gst.Bin.new(bin_name)
# Create elements
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", f"uri-decode-bin-{index}")
uri_decode_bin.set_property("uri", uri)
# Add to bin
nbin.add(uri_decode_bin)
# Connect to pad-added signal
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
# Create ghost pad
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target(f"src-{index}", Gst.PadDirection.SRC))
if not bin_pad:
print("Failed to add ghost pad in source bin")
return None
return nbin
def cb_newpad(decodebin, decoder_src_pad, data):
"""Callback for new pad"""
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
# Only link video pads
if gstname.find("video") != -1:
bin_ghost_pad = data.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
print("Failed to link decoder src pad to source bin ghost pad")
def decodebin_child_added(child_proxy, Object, name, user_data):
"""Configure decoder"""
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
if name.find("nvv4l2decoder") != -1:
Object.set_property("cudadec-memtype", 0)
def main():
# Initialize GStreamer
Gst.init(None)
# Create pipeline
pipeline = Gst.Pipeline()
# Create elements
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
sink = Gst.ElementFactory.make("nveglglessink", "video-renderer")
if not pipeline or not streammux or not pgie or not tracker or not nvvidconv or not nvosd or not tiler or not sink:
print("Unable to create elements")
return
# Set properties
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', len(sources))
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path',
'/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.txt')
tracker.set_property('ll-lib-file',
'/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
tracker.set_property('ll-config-file',
'/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml')
tiler.set_property('rows', 1)
tiler.set_property('columns', 2)
tiler.set_property('width', 1920)
tiler.set_property('height', 1080)
sink.set_property('sync', 0)
# Add elements to pipeline
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
# Create and add source bins
for i, uri in enumerate(sources):
source_bin = create_source_bin(i, uri)
if source_bin:
pipeline.add(source_bin)
padname = f"sink_{i}"
sinkpad = streammux.get_request_pad(padname)
srcpad = source_bin.get_static_pad("src")
srcpad.link(sinkpad)
# Link elements
streammux.link(pgie)
pgie.link(tracker)
tracker.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(sink)
# Create event loop and bus watch
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", lambda bus, msg: bus_call(bus, msg, loop), None)
# Start pipeline
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# Cleanup
pipeline.set_state(Gst.State.NULL)
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}: {debug}")
loop.quit()
return True
if __name__ == '__main__':
sys.exit(main())
Next Steps
- Python Bindings: Deep dive into Python API
- Model Deployment: Deploy custom models
- Best Practices: Performance optimization tips
Common Patterns
Pattern 1: Save Inference Results
def save_frame_with_detections(frame_meta, output_dir):
# Get frame data
frame_number = frame_meta.frame_num
# Save metadata to JSON
detections = []
l_obj = frame_meta.obj_meta_list
while l_obj:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
detections.append({
'class': obj_meta.obj_label,
'confidence': obj_meta.confidence,
'bbox': [obj_meta.rect_params.left, obj_meta.rect_params.top,
obj_meta.rect_params.width, obj_meta.rect_params.height]
})
l_obj = l_obj.next
# Save to file
import json
with open(f"{output_dir}/frame_{frame_number}.json", 'w') as f:
json.dump(detections, f, indent=2)
Pattern 2: ROI Filtering
def is_in_roi(obj_meta, roi):
"""Check if object center is in ROI"""
cx = obj_meta.rect_params.left + obj_meta.rect_params.width / 2
cy = obj_meta.rect_params.top + obj_meta.rect_params.height / 2
return (roi['x1'] <= cx <= roi['x2'] and
roi['y1'] <= cy <= roi['y2'])
Pattern 3: FPS Counter
import time
class FPSCounter:
def __init__(self):
self.start_time = time.time()
self.frame_count = 0
def update(self):
self.frame_count += 1
elapsed = time.time() - self.start_time
if elapsed > 1.0:
fps = self.frame_count / elapsed
print(f"FPS: {fps:.2f}")
self.frame_count = 0
self.start_time = time.time()