程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Nvidia Deepstream Python Queue 多線程操作

編輯:Python

Nvidia Deepstream小細節系列:Deepstream Python Queue 多線程操作

本文介紹Queue在Deepstream/Gstreamer中的作用。

環境描述:

  • 本案例運行環境:Jetson NX
  • IDE:VSCode
  • JetPack 4.6.1 GA
  • Deepstream 6.0.1

文章目錄

  • Nvidia Deepstream小細節系列:Deepstream Python Queue 多線程操作


在Deepstream Python的官方案例中,deepstream-test3案例中用到了非常多queue模塊,整個pipeline的代碼如下:

def main(args):
# Check input arguments
if len(args) < 2:
sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % args[0])
sys.exit(1)
for i in range(0,len(args)-1):
fps_streams["stream{0}".format(i)]=GETFPS(i)
number_sources=len(args)-1
# Standard GStreamer initialization
GObject.threads_init()
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin ",i," \n ")
uri_name=args[i+1]
if uri_name.find("rtsp://") == 0 :
is_live = True
source_bin=create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname="sink_%u" %i
sinkpad= streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad=source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
queue5=Gst.ElementFactory.make("queue","queue5")
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(queue3)
pipeline.add(queue4)
pipeline.add(queue5)
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
print("Creating tiler \n ")
tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
nvosd.set_property('process-mode',OSD_PROCESS_MODE)
nvosd.set_property('display-text',OSD_DISPLAY_TEXT)
if(is_aarch64()):
print("Creating transform \n ")
transform=Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
if not transform:
sys.stderr.write(" Unable to create transform \n")
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
sys.stderr.write(" Unable to create egl sink \n")
if is_live:
print("Atleast one of the sources is live")
streammux.set_property('live-source', 1)
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
pgie.set_property("batch-size",number_sources)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property("rows",tiler_rows)
tiler.set_property("columns",tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("qos",0)
sink.set_property("sync", 0)
print("Adding elements to Pipeline \n")
pipeline.add(pgie)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
if is_aarch64():
pipeline.add(transform)
pipeline.add(sink)
print("Linking elements in the Pipeline \n")
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2)
queue2.link(tiler)
tiler.link(queue3)
queue3.link(nvvidconv)
nvvidconv.link(queue4)
queue4.link(nvosd)
if is_aarch64():
nvosd.link(queue5)
queue5.link(transform)
transform.link(sink)
else:
nvosd.link(queue5)
queue5.link(sink)
# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
tiler_src_pad=pgie.get_static_pad("src")
Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
if not tiler_src_pad:
sys.stderr.write(" Unable to get src pad \n")
else:
tiler_src_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_src_pad_buffer_probe, 0)
# List the sources
print("Now playing...")
for i, source in enumerate(args):
if (i != 0):
print(i, ": ", source)
print("Starting pipeline \n")
# start play back and listed to events 
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)

我們通過graphviz打印出pipeline的可視化圖,如下:

我們可以局部放大,如下圖。我們發現,在原有pipeline的基礎上,每兩個模塊之間,我們都加了一個queue模塊。而這個queue模塊的輸入輸出在圖中顯示的是Any,說明queue的作用和輸入輸出變化無關。

我們查看Gstreamer的官網,在章節When would you want to force a thread?中解釋了使用queue的原因和作用:

We have seen that threads are created by elements but it is also possible to insert elements in the pipeline for the sole purpose of forcing a new thread in the pipeline. (我們已經看到線程是由element創建的,但也有可能為了強制在pipeline中使用新線程而在pipeline中插入element。)

There are several reasons to force the use of threads. However, for performance reasons, you never want to use one thread for every element out there, since that will create some overhead. Let’s now list some situations where threads can be particularly useful:(強制使用線程有幾個原因。 但是,出於性能原因,您永遠不想為那裡的每個element都使用一個線程,因為這會產生一些開銷。 現在讓我們列出一些線程特別有用的情況:)

  • Data buffering, for example when dealing with network streams or when recording data from a live stream such as a video or audio card. Short hickups elsewhere in the pipeline will not cause data loss. See also Stream buffering about network buffering with queue2.(數據緩沖,例如在處理網絡流或從視頻或音頻卡等實時流記錄數據時。 pipeline中其他地方的短暫中斷不會導致數據丟失。 另請參閱關於使用 queue2 進行網絡緩沖。)

  • Synchronizing output devices, e.g. when playing a stream containing both video and audio data. By using threads for both outputs, they will run independently and their synchronization will be better.(同步輸出設備,例如 播放包含視頻和音頻數據的流時。 通過對兩個輸出使用線程,它們將獨立運行並且它們的同步會更好。)

所以說,queue模塊的作用是強制給pipeline中的某個模塊開線程。在Deepstream Python的使用過程中,個人覺得可以設置一些queue作為數據緩沖。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved