7/19/2019

DeepStream SDK 3.0 Kafka 연결

1. DeepStream SDK 3.0 Msg 관련부분 설정 및 테스트


  • 기본설치방법

아래링크의-DeepStream SDK 3.0 설치 참조
  https://ahyuo79.blogspot.com/2019/06/deepstream-sdk-30-jetpack-42-411.html


NVIDIA msgbroker 관련부분
  https://devtalk.nvidia.com/default/topic/1048534/deepstream-for-tesla/deepstream-test4-app-stalled-after-the-first-few-frames/
 https://devtalk.nvidia.com/default/topic/1044365/deepstream-for-tesla/using-kafka-protocol-for-retrieving-data-from-a-deepstream-pipeline-/

NVIDIA Analytic Server ( Smart PArking Sytstem)
  https://github.com/NVIDIA-AI-IOT/deepstream_360_d_smart_parking_application/

Kafka Site
  https://kafka.apache.org/intro.html

Apach Kafka 구조 파악
  https://epicdevs.com/17
  https://www.joinc.co.kr/w/man/12/Kafka/chatting

1.1 DeepStream SDK 3.0 TEST 4 부분 

아래 링크의 3.4 Deepstream TEST 4 (Kafka) 부분 다시 참조
  https://ahyuo79.blogspot.com/2019/06/deepstream-sdk-30-jetpack-42-411.html



$ vi deepstream_test4.app.c 
#define PROTOCOL_ADAPTOR_LIB  "/usr/lib/aarch64-linux-gnu/tegra/libnvds_kafka_proto.so"
#define CONNECTION_STRING "127.0.0.1;9092;dsapp2"

  1. 우선 아래의 Kafka를 설치
  2. 설치완료 후 dsapp2의 topic 생성
  3. 상위 수정된 소스 make 후 deepstream-test4-app 실행 
  4. 상위 topic으로 받은 메시지를 bin/kafka-console-consumer.sh 이용하여 확인 


이 소스는 Gst-msgconv 와 Gst-msgbroker 를 같이 사용하는 예제이다.
Gst-msgconv는 현재 config file을 별도로 지정하여 dstest4_msgconv_config.txt 설정하고 이 설정에 해당하는 소스부분이 상위 소스에 있다.
Gst-msgbroker 는 받은 metadata를 Kafka에게 전송하는 역할이며, 현재 모두 JSON Format이다.

$ cd /usr/local/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dsapp2 --from-beginning            //deepstream-test4-app 이 보낸 metadata 를 json으로 보냄 
{
  "messageid" : "220a8cea-9dc5-448c-8da6-cc9998a5fefe",
  "mdsversion" : "1.0",
  "@timestamp" : "2019-07-22T02:01:54.067Z",
  "place" : {
    "id" : "1",
    "name" : "XYZ",
    "type" : "garage",
    "location" : {
      "lat" : 30.32,
      "lon" : -40.549999999999997,
      "alt" : 100.0
    },
    "aisle" : {
      "id" : "walsh",
      "name" : "lane1",
      "level" : "P2",
      "coordinate" : {
        "x" : 1.0,
        "y" : 2.0,
        "z" : 3.0
      }
    }
  },
  "sensor" : {
    "id" : "CAMERA_ID",
    "type" : "Camera",
    "description" : "\"Entrance of Garage Right Lane\"",
    "location" : {
      "lat" : 45.293701446999997,
      "lon" : -75.830391449900006,
      "alt" : 48.155747933800001
    },
    "coordinate" : {
      "x" : 5.2000000000000002,
      "y" : 10.1,
      "z" : 11.199999999999999
    }
  },
  "analyticsModule" : {
    "id" : "XYZ",
    "description" : "\"Vehicle Detection and License Plate Recognition\"",
    "source" : "OpenALR",
    "version" : "1.0",
    "confidence" : 0.0
  },
  "object" : {
    "id" : "db521849-00f8-44f6-b20c-daf3111b6ccd",
    "speed" : 0.0,
    "direction" : 0.0,
    "orientation" : 0.0,
    "vehicle" : {
      "type" : "sedan",
      "make" : "Bugatti",
      "model" : "M",
      "color" : "blue",
      "licenseState" : "CA",
      "license" : "XX1234",
      "confidence" : 0.0
    },
    "bbox" : {
      "topleftx" : 0,
      "toplefty" : 0,
      "bottomrightx" : 0,
      "bottomrighty" : 0
    },
    "location" : {
      "lat" : 0.0,
      "lon" : 0.0,
      "alt" : 0.0
    },
    "coordinate" : {
      "x" : 0.0,
      "y" : 0.0,
      "z" : 0.0
    }
  },
  "event" : {
    "id" : "b63d0d65-eaf8-44c5-be51-979a009f8a33",
    "type" : "moving"
  },
  "videoPath" : ""
}
....   // Ctrl+C 로 정지 



2.  Apache Kafka 설치 작업 및 테스트 

Apache Kafka를 설치를 위해서 Apache Zookeeper가 필요하기에 아래와 같이 설치하다.

  • apt update 와 Java JDK or default-jre 설치 
Kafka 설치를 위해 JDK or JRE설치

$ sudo apt-get update
$ sudo apt-get install openjdk-8-jdk


2.1 Apache zookeeper  기본설치


zookeeper site
  https://zookeeper.apache.org/releases.html

package download
  https://www-eu.apache.org/dist/zookeeper/
  http://www-us.apache.org/dist/zookeeper
  http://apache.mirror.globo.tech/zookeeper


  • zookeeper 설치 (apt 이용)
$ sudo apt-get install zookeeperd

$ find / -name zkServer.sh 2> /dev/null 
/usr/share/zookeeper/bin/zkServer.sh


  • package download 설치 
package로 설치하면 /usr/share 에 존재하는 것 같아  /usr/local/변경

$ wget https://www-eu.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
$ tar zxvf zookeeper-3.4.14.tar.gz
$ sudo mv zookeeper-3.4.14/ /usr/local/zookeeper
$ sudo chown -R root:root /usr/local/zookeeper

$ find / -name zkServer.sh 2> /dev/null      // 좀더 최신 version이라 좀 다르다.
/usr/local/zookeeper/zookeeper-contrib/zookeeper-contrib-rest/src/test/zkServer.sh
/usr/local/zookeeper/zookeeper-contrib/zookeeper-contrib-zkpython/src/test/zkServer.sh
/usr/local/zookeeper/zookeeper-client/zookeeper-client-c/tests/zkServer.sh
/usr/local/zookeeper/bin/zkServer.sh
/usr/local/zookeeper/zookeeper-recipes/zookeeper-recipes-lock/src/main/c/tests/zkServer.sh
/usr/local/zookeeper/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/zkServer.sh


2.2 Apache Zookeeper  Service 등록 

사용하기 편하기 위해서 아래와 같이 systemd에 Service로 등록을 한다.

  • systemd 에 zookeeper.service 등록 

$ sudo vi /etc/systemd/system/zookeeper.service
[Unit]
Description=zookeeper-server
After=network.target

[Service]
Type=forking
User=root
Group=root
SyslogIdentifier=zookeeper-server
WorkingDirectory=/usr/share/zookeeper
Restart=on-failure
RestartSec=0s
ExecStart=/usr/share/zookeeper/bin/zkServer.sh start
ExecStop=/usr/share/zookeeper/bin/zkServer.sh stop


  • system service 등록 및 확인 

$ sudo systemctl daemon-reload
$ sudo systemctl enable zookeeper.service
$ sudo systemctl start zookeeper.service
$ sudo systemctl status zookeeper.service


  • zookeeper 상태체크

$ netstat -nlp|grep 2181
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::2181                 :::*                    LISTEN      -  

$ sudo /usr/share/zookeeper/bin/zkServer.sh status     
ZooKeeper JMX enabled by default
Using config: /etc/zookeeper/conf/zoo.cfg
Error contacting service. It is probably not running.

or 

$ echo status | nc 127.0.0.1 2181
Zookeeper version: 3.4.10-3--1, built on Sat, 03 Feb 2018 14:58:02 -0800
Clients:
 /127.0.0.1:45894[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 3
Sent: 2
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4


2.3  Apache Kafka 관련 설치 및 서비스 등록 


Apache Kafka
  https://kafka.apache.org/

Package download (지원되는 Version확인)
  https://www-eu.apache.org/dist/kafka
  http://www-us.apache.org/dist/kafka
  http://apache.mirror.globo.tech/kafka


Download하면서 Version이 혼동이 되었는데, 아래 사이트에서 정확하게 알게되었다.
Kafka가 Scala 언어로 개발되었는데,  호환성문제로 Java로 이식되었다고 한다.
Version 은  File에서 두개로 표시를 한다

e.g kafka/2.2.1/kafka_2.12-2.2.1.tgz  (Scala:2.12 , Kafka:2.2.1)

현재 Download 하는 사이트들을 보니 현재는 Kafka 2.x.x 만 지원을 하고 있어서 이 기준으로 변경


  • Kafka Download 및 기본 테스트 진행 

$ wget http://www-us.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz  //download 가능 

$ tar zxvf kafka_2.12-2.2.1.tgz
$ sudo mv kafka_2.12-2.2.1 /usr/local/kafka

$ cd /usr/local/kafka

$ vi config/server.properties     // Log Data 설정된 장소확인 및 변경 
log.dirs=/tmp/kafka-logs/

상위 log.dirs 은 만약 topic를 생성하면 이곳에서 관련 data를 확인이 가능하다.


  • Kafka Service 등록 

$ sudo vi  /etc/systemd/system/kafka.service          // 새로운 File 생성 
[Unit]
Description=kafka-server
After=network.target

[Service]
Type=simple
User=root
Group=root
SyslogIdentifier=kafka-server
WorkingDirectory=/usr/local/kafka
Restart=no
RestartSec=0s
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

$ systemctl daemon-reload
$ systemctl enable kafka.service
$ systemctl start kafka.service
$ systemctl status kafka.service



  • Kafka 동작확인

$ sudo apt install lsof 
$ sudo lsof -i :2181                       //Zookeeper 만 동작될 때 
COMMAND   PID      USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    18819 zookeeper   43u  IPv6 759187      0t0  TCP *:2181 (LISTEN)

$  sudo lsof -i :2181      //Zookeeper 와 Kafka 동작될 때 
COMMAND   PID   USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    22350 nvidia  109u  IPv6 767085      0t0  TCP *:2181 (LISTEN)
java    22350 nvidia  110u  IPv6 761679      0t0  TCP localhost:2181->localhost:46018 (ESTABLISHED)
java    22675 nvidia  109u  IPv6 766151      0t0  TCP localhost:46018->localhost:2181 (ESTABLISHED)



2.4  Kafka TEST 방법 

Kafka는 File기반으로 Topic을 만들고,  이를 통신하는 구조라고 한다.
아래과 같이 새로운 testTopic을 만들고 이를 MSG를 전송해보고 받아보자.

  • Kafka에서 testTopic 생성 및 확인

$ cd /usr/local/kafka
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
Created topic "testTopic".

$ bin/kafka-topics.sh --list --zookeeper localhost:2181     // 생성된 list
testTopic

$ ls //tmp/kafka-logs/     // testTopic 확인 
..
testTopic-0/               


  • Kafka로 MSG 전송 (testTopic)

$ cd /usr/local/kafka
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic    
>Welcome to kafka
>This is my first topic      // Ctrol+c 


  • Kafka에서 받은 메시지 확인  (testTopic)

$ cd /usr/local/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
Welcome to kafka
This is my first topic


Kafka 설치방법들
  https://zzsza.github.io/data/2018/07/24/apache-kafka-install/
  https://tecadmin.net/install-apache-kafka-ubuntu/
  https://linuxhint.com/install-apache-kafka-ubuntu/
  https://jwon.org/install-kafka-on-ubuntu/
  https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-18-04


2.5  systemctl 사용법 

  • systemd default target 확인

$ systemctl get-default
multi-user.target

$ sudo systemctl set-default graphical.target  // X-Windows Log In 변경 

$ systemctl get-default
graphical.target


  • systemd target list 확인

$ systemctl list-units --type target
UNIT                  LOAD   ACTIVE SUB    DESCRIPTION              
basic.target          loaded active active Basic System             
bluetooth.target      loaded active active Bluetooth                
cryptsetup.target     loaded active active Encrypted Volumes        
getty.target          loaded active active Login Prompts            
graphical.target      loaded active active Graphical Interface      
local-fs-pre.target   loaded active active Local File Systems (Pre) 
local-fs.target       loaded active active Local File Systems       
multi-user.target     loaded active active Multi-User System        
network-online.target loaded active active Network is Online        
network.target        loaded active active Network                  
nfs-client.target     loaded active active NFS client services      
paths.target          loaded active active Paths                    
remote-fs-pre.target  loaded active active Remote File Systems (Pre)
remote-fs.target      loaded active active Remote File Systems      
slices.target         loaded active active Slices                   
sockets.target        loaded active active Sockets                  
sound.target          loaded active active Sound Card               
swap.target           loaded active active Swap                     
sysinit.target        loaded active active System Initialization    
time-sync.target      loaded active active System Time Synchronized 
timers.target         loaded active active Timers                   

LOAD   = Reflects whether the unit definition was properly loaded.
ACTIVE = The high-level unit activation state, i.e. generalization of SUB.
SUB    = The low-level unit activation state, values depend on unit type.


  • systemd default target 확인

$ systemctl get-default
multi-user.target

$ systemctl get-default
graphical.target

systemctl 사용법
  https://www.lesstif.com/pages/viewpage.action?pageId=24445064


System Service 등록방법
  https://fmd1225.tistory.com/93
  https://pinedance.github.io/blog/2017/09/12/Ubuntu-16.04-system-service-%EB%93%B1%EB%A1%9D%ED%95%98%EA%B8%B0