Skip to main content

Basic Usage

Learn the core concepts and components of DeepStream to build sophisticated video analytics pipelines.

Pipeline Architecture

DeepStream pipelines are built using GStreamer elements. Understanding the pipeline structure is essential for building custom applications.

Basic Pipeline Structure

Source → Decode → Preprocess → Inference → Postprocess → Output

Multi-Stream Pipeline

Source 1 ──┐
Source 2 ──┤→ Mux → Inference → Tracking → Tiler → OSD → Sink
Source N ──┘

Core GStreamer Plugins

Input Plugins

filesrc: Read from file

source = Gst.ElementFactory.make("filesrc", "file-source")
source.set_property('location', '/path/to/video.mp4')

uridecodebin: Universal decoder for various sources

source = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
source.set_property('uri', 'file:///path/to/video.mp4')
# or
source.set_property('uri', 'rtsp://192.168.1.100:554/stream')

v4l2src: USB/V4L2 camera

source = Gst.ElementFactory.make("v4l2src", "usb-cam-source")
source.set_property('device', '/dev/video0')

DeepStream Plugins

nvstreammux: Stream multiplexer (batching)

streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', 4)
streammux.set_property('batched-push-timeout', 4000000)
streammux.set_property('live-source', 1) # For RTSP/live sources

nvinfer: Primary or secondary inference

pgie = Gst.ElementFactory.make("nvinfer", "primary-nvinference-engine")
pgie.set_property('config-file-path', 'config_infer_primary.txt')

# For secondary inference
sgie = Gst.ElementFactory.make("nvinfer", "secondary-nvinference-engine")
sgie.set_property('config-file-path', 'config_infer_secondary.txt')

nvtracker: Object tracking

tracker = Gst.ElementFactory.make("nvtracker", "tracker")
tracker.set_property('ll-lib-file', '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
tracker.set_property('ll-config-file', 'config_tracker.txt')

nvmultistreamtiler: Multi-stream tiler

tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
tiler.set_property('rows', 2)
tiler.set_property('columns', 2)
tiler.set_property('width', 1920)
tiler.set_property('height', 1080)

nvvideoconvert: Color space conversion

nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")

nvdsosd: On-screen display

nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
nvosd.set_property('process-mode', 1) # 0=CPU, 1=GPU, 2=VIC
nvosd.set_property('display-text', 1)
nvosd.set_property('display-bbox', 1)
nvosd.set_property('display-mask', 1)

Output Plugins

nveglglessink: Display on screen

sink = Gst.ElementFactory.make("nveglglessink", "video-renderer")
sink.set_property('sync', 0)

filesink: Save to file

sink = Gst.ElementFactory.make("filesink", "file-sink")
sink.set_property('location', 'output.mp4')
sink.set_property('sync', 0)

nvmsgbroker: Send metadata to message broker

msgbroker = Gst.ElementFactory.make("nvmsgbroker", "msgbroker")
msgbroker.set_property('proto-lib', '/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so')
msgbroker.set_property('config', 'kafka_config.txt')
msgbroker.set_property('topic', 'metadata')

Configuration Files

DeepStream uses configuration files for complex settings. Here are the main types:

Primary Inference Config

config_infer_primary.txt
[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
custom-network-config=yolov4.cfg
model-file=yolov4.weights
model-engine-file=yolov4_b1_gpu0_fp32.engine
labelfile-path=labels.txt
batch-size=1
network-mode=0
num-detected-classes=80
interval=0
gie-unique-id=1
network-type=0

[class-attrs-all]
pre-cluster-threshold=0.25
eps=0.2
group-threshold=1

Tracker Config

config_tracker.txt
[tracker]
tracker-width=640
tracker-height=384
ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
ll-config-file=config_tracker_NvDCF_perf.yml
enable-batch-process=1
enable-past-frame=1

Message Broker Config (Kafka)

kafka_config.txt
[message-broker]
proto-lib=/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so
conn-str=localhost;9092
topic=deepstream-events
partition-key=sensor-id

[message-consumer]
enable=0

Metadata Management

Metadata is the core of DeepStream applications. It flows through the pipeline attached to each frame.

Metadata Hierarchy

NvDsBatchMeta
└─ NvDsFrameMeta (per frame)
├─ NvDsObjectMeta (per detected object)
│ ├─ NvDsClassifierMeta (per secondary classification)
│ └─ NvDsUserMeta (custom metadata)
└─ NvDsDisplayMeta (display metadata)

Accessing Metadata

def osd_sink_pad_buffer_probe(pad, info, u_data):
gst_buffer = info.get_buffer()
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))

l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break

# Access frame-level info
frame_number = frame_meta.frame_num
source_id = frame_meta.source_id
num_objects = frame_meta.num_obj_meta

# Iterate through objects
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break

# Object properties
obj_id = obj_meta.object_id
class_id = obj_meta.class_id
confidence = obj_meta.confidence
label = obj_meta.obj_label

# Bounding box
left = obj_meta.rect_params.left
top = obj_meta.rect_params.top
width = obj_meta.rect_params.width
height = obj_meta.rect_params.height

# Tracking info (if tracker is used)
if obj_meta.tracker_confidence > 0:
track_id = obj_meta.object_id

try:
l_obj = l_obj.next
except StopIteration:
break

try:
l_frame = l_frame.next
except StopIteration:
break

return Gst.PadProbeReturn.OK

Adding Custom Metadata

def add_custom_metadata(frame_meta, custom_data):
# Acquire user meta from pool
user_meta = pyds.nvds_acquire_user_meta_from_pool(frame_meta.base_meta.batch_meta)

if user_meta:
user_meta.user_meta_data = custom_data
user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_USER_META

# Set callback for cleanup
user_meta.base_meta.release_func = release_user_meta
user_meta.base_meta.copy_func = copy_user_meta

# Add to frame meta
pyds.nvds_add_user_meta_to_frame(frame_meta, user_meta)

def release_user_meta(data, user_data):
# Cleanup function
pass

def copy_user_meta(data, user_data):
# Copy function for metadata
return data

Building a Complete Pipeline

Here's a complete example combining multiple streams with tracking:

multi_stream_tracker.py
#!/usr/bin/env python3

import sys
sys.path.append('/opt/nvidia/deepstream/deepstream/lib')

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
import pyds

# Source URIs
sources = [
"file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264",
"file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
]

def create_source_bin(index, uri):
"""Create source bin for each stream"""
bin_name = f"source-bin-{index}"
nbin = Gst.Bin.new(bin_name)

# Create elements
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", f"uri-decode-bin-{index}")
uri_decode_bin.set_property("uri", uri)

# Add to bin
nbin.add(uri_decode_bin)

# Connect to pad-added signal
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)

# Create ghost pad
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target(f"src-{index}", Gst.PadDirection.SRC))
if not bin_pad:
print("Failed to add ghost pad in source bin")
return None

return nbin

def cb_newpad(decodebin, decoder_src_pad, data):
"""Callback for new pad"""
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()

# Only link video pads
if gstname.find("video") != -1:
bin_ghost_pad = data.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
print("Failed to link decoder src pad to source bin ghost pad")

def decodebin_child_added(child_proxy, Object, name, user_data):
"""Configure decoder"""
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
if name.find("nvv4l2decoder") != -1:
Object.set_property("cudadec-memtype", 0)

def main():
# Initialize GStreamer
Gst.init(None)

# Create pipeline
pipeline = Gst.Pipeline()

# Create elements
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
sink = Gst.ElementFactory.make("nveglglessink", "video-renderer")

if not pipeline or not streammux or not pgie or not tracker or not nvvidconv or not nvosd or not tiler or not sink:
print("Unable to create elements")
return

# Set properties
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', len(sources))
streammux.set_property('batched-push-timeout', 4000000)

pgie.set_property('config-file-path',
'/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.txt')

tracker.set_property('ll-lib-file',
'/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
tracker.set_property('ll-config-file',
'/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml')

tiler.set_property('rows', 1)
tiler.set_property('columns', 2)
tiler.set_property('width', 1920)
tiler.set_property('height', 1080)

sink.set_property('sync', 0)

# Add elements to pipeline
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)

# Create and add source bins
for i, uri in enumerate(sources):
source_bin = create_source_bin(i, uri)
if source_bin:
pipeline.add(source_bin)
padname = f"sink_{i}"
sinkpad = streammux.get_request_pad(padname)
srcpad = source_bin.get_static_pad("src")
srcpad.link(sinkpad)

# Link elements
streammux.link(pgie)
pgie.link(tracker)
tracker.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(sink)

# Create event loop and bus watch
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", lambda bus, msg: bus_call(bus, msg, loop), None)

# Start pipeline
pipeline.set_state(Gst.State.PLAYING)

try:
loop.run()
except:
pass

# Cleanup
pipeline.set_state(Gst.State.NULL)

def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}: {debug}")
loop.quit()
return True

if __name__ == '__main__':
sys.exit(main())

Next Steps

Common Patterns

Pattern 1: Save Inference Results

def save_frame_with_detections(frame_meta, output_dir):
# Get frame data
frame_number = frame_meta.frame_num

# Save metadata to JSON
detections = []
l_obj = frame_meta.obj_meta_list
while l_obj:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
detections.append({
'class': obj_meta.obj_label,
'confidence': obj_meta.confidence,
'bbox': [obj_meta.rect_params.left, obj_meta.rect_params.top,
obj_meta.rect_params.width, obj_meta.rect_params.height]
})
l_obj = l_obj.next

# Save to file
import json
with open(f"{output_dir}/frame_{frame_number}.json", 'w') as f:
json.dump(detections, f, indent=2)

Pattern 2: ROI Filtering

def is_in_roi(obj_meta, roi):
"""Check if object center is in ROI"""
cx = obj_meta.rect_params.left + obj_meta.rect_params.width / 2
cy = obj_meta.rect_params.top + obj_meta.rect_params.height / 2

return (roi['x1'] <= cx <= roi['x2'] and
roi['y1'] <= cy <= roi['y2'])

Pattern 3: FPS Counter

import time

class FPSCounter:
def __init__(self):
self.start_time = time.time()
self.frame_count = 0

def update(self):
self.frame_count += 1
elapsed = time.time() - self.start_time
if elapsed > 1.0:
fps = self.frame_count / elapsed
print(f"FPS: {fps:.2f}")
self.frame_count = 0
self.start_time = time.time()