Fluentd overview

Neng Liangpornrattana
5 min readApr 14, 2020

--

2–3 อาทิตย์ก่อนต้อง tune fluentd ครับ พอไปดู configuration มันแล้ว มีของเยอะมากๆ บางตัวก็เหมือนจะเข้าใจแต่ไม่เข้าใจ จะทำไงดี ก็ไปแกะ source code มันสิครับ

พอทำความเข้าใจมันได้ ก็ต้องเตรียมแผน scale อีก เลยต้องเข้าใจภาพรวมมันก่อน จะทำไงดี ก็แกะ code มันอีกรอบ คราวนี้แกะตั้งแต่ต้นทางยันปลายทางเลย จะได้เห็นคอขวดว่าตรงไหนน่าจะเกิด พอเห็นภาพกว้างๆ แล้วก็เอามาสรุปเอาไว้หน่อยดีกว่า

fluentd คืออะไร?

ในเว็บมันว่าไว้งี้ครับ

Fluentd is an open source data collector for unified logging layer.

ถูกสร้างขึ้นด้วยภาษา ruby

ซึ่งภาพจำของหลายๆ คนเนี่ย fluentd จะเป็นเครื่องมือเอาไว้จัดการ logging แต่จริงๆ แล้วเนี่ย input data ที่เข้ามามันเป็นอะไรก็ได้ ขอให้มัน parse ได้ก็พอ

แต่อย่าไปดันทุรังเลยครับ เครื่องมือส่วนใหญ่ที่มี มันไว้จัดการ log หรือ event stream ก็ใช้มันเท่าที่มีแหละครับ ถ้าจะไป hack มากไปเดี๋ยวเหนื่อย

ไอ้คำว่าจัดการนี่มันจัดการอะไรบ้าง??

ก็คือรับ input data มา แล้วเอามาตัดแต่ง ปรับ เพิ่มเติม หรือลด แล้วก็ส่งไปไปขา output

  • input ก็อาจจะมาจาก log file หรือ kafka หรือจะรับมาทาง tcp socket หรือแม้กระทั่งจะเปิด api ให้ http request มาก็ยังได้ แล้วก็ parse มันให้อยู่ในรูป key-value ให้ได้
  • ปรับแต่งค่า หรือ filter หรือเพิ่มหรือลด key ที่ parse เข้ามาจากขา input ด้วย DSL ที่ fluentd เตรียมไว้
  • output ออกไป kafka หรือ elaticsearch หรือไป database system หรือออกไปเป็น text file ก็ยังได้

หรือเครื่องมือที่มันมียังไม่พอใจ ก็เขียนเองมันเลยครับ มันมีระบบ plugin ให้ใช้อยู่

อ่านข้อมูลแบบละเอียดๆ ได้ที่

แล้ว(ผม)เอาไปใช้ทำอะไร

เอาไปเป็นเครื่องมือไว้ส่ง log ไป kafka เพื่อทำ centralized logging system ครับ

เนื่องจากเครื่องไม้เครื่องมือที่ใช้งานอยู่ ถูก deploy ไว้บน k8s cluster ครับ ทางเว็บ fluentd ก็ขายไว้ว่ารองรับการทำงานบน k8s ไว้ ก็คิดว่าเป็นเครื่องมือที่เหมาะสมดีครับ เพราะมีท่าที่ไว้ deploy บน k8s ไว้พอดี

pipeline ของผมง่ายมาก คือมี

  • input เป็น log file ที่ออกมาจาก stdout และ stderr จาก container ใน pod บน k8s cluster โดยใช้ plugin ที่ชื่อว่า tail
  • parse ให้เป็น key-value object ของ fluentd (ซึ่งจริงๆ ก็คือ ruby object แหละ) จาก format ของ access log, modified nginx log, envoy และ json
  • มีปรับ log อย่าง group พวก multiline log หรือ เพิ่ม time key ไป กรณีที่มัน parse เวลาออกมาไม่ได้
  • output เป็น kafka โดยใช้ plugin ที่ชื่อว่า kafka

เอาเป็นแนะนำให้รู้จักกับ fluentd คร่าวๆ ประมาณนี้ละกันครับ

เข้าเรื่อง!!

ก่อนจะลงรายละเอียด ขอเล่าเพิ่มอีกนิด คือ ถ้าเราอยากให้ fluentd ไปเก็บ data หรือ log จากไหน ก็ไปติดตั้ง fluentd process(agent) ที่เครืองนั้น

ทีนี้ ตอนเริ่ม process นี่แหละ จะเริ่มลงรายละเอียดแล้วครับ คือ

Start process

เริ่มด้วยการสั่ง command ให้เริ่ม process ก่อน อย่างของผมก็เป็น daemonset ไป ก็เป็นกระบวณการเริ่มของ k8s

ทีนี พอเครื่องมือเราสั่งให้ agent ทำงาน ด้วยคำสั่งพร้อม parameters อีกมากมาย สิ่งที่มันเริ่มทำคือ มันจะเช็คก่อนว่าจะให้รันโหมดไหน จาก parameters ที่ใส่มา โดยโหมดหลักๆ จะมี

  • standalone หรือ single worker คือ มี process เดียว
  • multi-worker หรือ มีหลาย process ทำงานแบบขนาน ซึ่งแบบนี้ มันจะมีเครื่องมือ มาช่วย spawn process ซึ่ง fluentd ใช้เครื่องมือที่ชื่อว่า ServerEngine

ทั้งสองแบบ ถูก wrap ให้ทำงานผ่าน supervisor ที่เป็นจุดเริ่มต้นของทุก process จะต่างกันเพียงแบบ standalone จะถูก supervisor สั่งเริ่มใน code เลย แต่แบบ multi-worker จะมีการ spawn process ตามจำนวน worker พร้อม command ที่จะสั่งให้เริ่มทำงาน

Initiate root agent

โดย root agent นี่คือพระเอกแล้ว เป็นตัวคุม flow เลย

โดยเริ่มจากไปอ่าน config file ก่อน ซึ่ง config file นี้ก็คร่าวๆ คือ มันจะบอกว่าเราจะจัดการกับ data ยังไงนี่แหละ จะมีส่วนที่ระบุ source input แล้วจะ filter ยังไง แล้ว output destination จะไปที่ไหน ตามลำดับที่ประกาศ(rule)ไว้ใน config file แล้วเอาไปเก็บไว้ในสิ่งที่เรียกว่า EventRouter

Initiate input

จากการอ่าน config file แล้ว ก็รู้แล้วว่าจะไปเอาข้อมูลจากไหน ก็เริ่มสร้าง input thread ตาม type ที่ระบุไว้ใน config file หลังจากนี้ input thread นี้ ก็เริ่มอ่าน log file ที่ระบุไว้ แล้วก็ parse ตามที่ระบุไว้ parser วนไปเรื่อย

ทีนี้ พอ parse เสร็จ มันจะบันทึกตำแหน่งที่อ่านลง pos_file ก่อน เพื่อให้แน่ใจว่าตอน restart process จะไม่อ่านสิ่งที่อ่านไปแล้ว จากนั้นจะเริ่มส่งข้อมูล(emit)ให้ไปที่ขา output โดยการส่งผ่านตัว EventRouter โดยวน loop rules ตามลำดับที่ประกาศไว้ใน config file ก็จะต้องผ่าน filter ก่อน หลังจากคัดกรองเสร็จถึงจะส่งไปที่ output ได้

อธิบาย ข้อมูล หรือ record เพิ่มนึดนึง คือ 1 record ใน fluentd เรียก event ที่ประกอบร่างจากการ parse จะประกอบไปด้วย 3 ส่วน คือ

  • tag เสมือนเส้นทางหรือกลุ่ม บอกว่าจะวิ่งไปทางไหน หรือเกี่ยวกับอะไร
  • time คือ event นี้เกิดขึ้นตอนไหน
  • record คือตัว data จริงๆ ที่เรา parse มันออกมาเป็น key-value

ซึ่งจะต้องมี 3 สิ่งนี้ไปด้วยกันเสมอๆ

มันยังมีอีกสิ่งหนึ่งเรียกว่า label คล้ายๆ กับ tag แต่จะกำหนดให้มันกระโดดไป label ไหนก็ได้ ต่างจาก tag ที่ flow จะวิ่งจากบนลงล่าง

Initiate output

กระบวณการนี้ จริงๆ แล้ว เป็นแค่ config ที่ถูกระบุไว้ให้ทำงานท้ายสุดของ pipeline เลย หลังจากที่ input thread ได้ emit record นึงผ่าน EventRouter มันจะมาทำตาม rules ใน EventRouter โดยผ่าน filter มาก่อน จนถึง output นี่แหละ

Output modes

มี 3 modes

  • Non-bufferred mode คือไม่มี buffer เลย พอมีการ emit record มาถึง output มันก็ส่งต่อไปหา output plugin เลยว่าจะให้ทำอะไรต่อไป โดยไม่ผ่าน buffer
  • Synchronous Buffered mode ขอยกไปลงรายละเอียดด้านล่าง
  • Asynchronous Buffered mode เหมือน Synchronous Buffered mode แต่ commit เพื่อเอา chunk ออกจาก buffer ทีหลัง ในเวลาที่กำหนด ขอยกไปลงรายละเอียดด้านล่างอีกเช่นกัน

นอกจาก buffer แล้ว ยังสามารถเซ็ต secondary output ได้ด้วย ไว้ในกรณีที่ส่งไปหา output แล้วไม่สำเร็จ เรายังบังคับมันให้ลงถังนี้ได้ ซึ่งถังนี้ถูกบังคับให้ใช้ buffer ด้วย

พอเริ่ม initiate output แล้วเนี่ย หลังจาก set ค่าต่างๆ จาก config แล้ว ก็เริ่มทำงานด้วยการเช็คก่อนว่าเป็นแบบ buffer หรือ non-buffer ซึ่งถ้าเป็น non-buffer มันก็ก็แค่ปิดระบบ buffer เท่านั้นเอง แต่ถ้าเป็นแบบ buffer จะมี flow ที่ผมเห็นว่าสำคัญคือสร้าง output thread จากจำนวน flush_thread_countที่ config มา

flush_thread_count

จะเป็น จำนวน output thread ที่ถูกสร้างขึ้นมาใน 1 process ซึ่ง output thread นี้จะทำหน้าที่ค่อยดึงของที่อยู่ใน คิวของ buffer ออกมานั่นเอง แต่สิ่งที่เกิดขึ้นเพิ่มเติม คือ มันสร้าง thread มาอีกตัวครับ ทีเอาไว้ดึงของจากใน buffer มาเข้าคิว แปลว่า จริงๆ แล้วจะมี thread เกิดขึ้น สองเท่าของ flush_thread_count เช่น ตั้งค่าให้เป็น 2 จะมี thread ที่ไว้เอาของออกมาจาก buffer ไปเข้าคิว 1 ตัว และอีกหนึ่งตัวเอาของออกจากคิวไป output ซึ่งคิวนี้เดี๋ยวจะอธิบายในหมวดของ buffer ครับ

อีกหนึ่งสิ่งที่ชวนงงถ้าไม่คุ้นเคยกับ thread ของ ruby ใน thread นี้ คือมันเป็น thread ของ ภาษา ruby ครับ มันจะไม่ทำงานลักษณะ parallel แบบ JVM นะครับ หลักการมันคือ ใน ruby จะมีสิ่งที่เรียกว่า Global Interpreter Lock(GIL) ซึ่งสิ่งนี้มันเป็นระบบ lock ที่จะป้องกันการที่ 2 thread หรือมากกว่าถูก execute ในเวลาเดียวกัน ซึ่งนี่เป็นคุณสมบัติหนึ่งของ ruby ที่ให้ความสำคัญกับเรื่อง thread safe ใน ruby เลย

ถ้ามันไม parallel แบบ JVM แล้วมันเป็นแบบไหน?? คือหลังจาก thread ใด thread นึง อยู่ใน lock ของ GIL แล้วเนี่ย มันจะออกจาก lock นี้ก็ต่อเมื่อมันมี blocking IO แล้วก็จะมี thread อื่นเข้ามาแทน จนกว่าจะได้แจ้งว่า blocking IO เสร็จแล้ว ก็จะมีกลไกที่ทำให้เข้ามาอยู่ใน lock เพื่อทำงานต่อ ซึ่งดูจากหลักการแล้วมันก็ยังมีสลับๆ กันทำงานอยู่ เพราะสุดท้ายแล้วมันก็ทำงานได้แค่ใน 1 cpu core (ก็เลยมีเรื่อง multi-worker เข้ามาช่วย)

อีกอย่างที่อยากเอ่ยถึงคือการใช้ Mutex คือ ระบบ data structure ใน ruby เพื่อให้แน่ใจได้ว่า ในช่วงเวลาหนึ่ง จะมี thread ตัวเดียวเท่านั้นเข้าถึง ซึ่งพอมีการใช้ thread ก็จะมีการเรียกใช้ mutex เพื่อให้แน่ใจว่าจะไม่เกิด race condition ใน fluentd (ซึ่งจริงๆ ก็เป็น practice ของ ruby อยู่แล้ว)

Buffer

หลังจาก initiate output แล้ว ใน output นี้ จะมีสิ่งที่เรียกว่า buffer ดักอยู่ก่อน ซึ่งจริงๆ มันสามารถเซ็ตได้ว่าจะให้ใช้ buffer หรือเปล่า ในกรณี ouput kafka ที่ผมใช้นี้ มีการเปิดให้ใช้ buffer ครับ

การมี buffer ช่วยให้มีที่เก็บ data ไว้ชั่วคราว กรณีที่ output ปลายทางของเรามีปัญหาทำให้เราไม่สามารถที่จะส่งข้อมูลไปหาได้ มันจะยังไม่เอาออกจาก buffer ครับ มันจะช่วยให้ข้อมูลไม่หาย ซึ่ง fluentd มีกลไกการ retry ไว้ให้ครับ แต่ผมขอข้าม

Buffer plugins

buffer plugin สามารถเซ็ตได้สองประเภทคือ

  • memory ข้อดีคือเร็ว แต่ถ้ามี process ตายแบบไม่ graceful shutdown ข้อมูลก็หายเหมือนกัน
  • file ข้อดีคือ ข้อมูลไม่หายเหมือนแบบ memory แต่ช้ากว่า แต่ก็แลกมาด้วย storage ที่ใหญ่ขึ้น

โดยทั้งสองแบบจะมี flow การทำงานใกล้เคียงกัน จะมีก็เพียงแต่วิธีเขียนหรืออ่าน จาก buffer ยังไง มันลงรายละเอียดมาก ผมยังไม่ขอลงลึก

การทำงานคร่าวๆ ของ buffer คือ มันจะถูกแบ่งเป็น chunk หรือก้อนของ data ที่รวมหลาย record ไว้ พอถึง threshold ที่กำหนด ก็แบ่งเป็น chunk(state :staged) แล้วเอา chunk นี้เข้าคิว(state :queued) เพื่อให้ ouput thread ดึงออกไป

Synchronous Buffered mode vs Asynchronous Buffered mode

สองโหมดนี้ต่างกันแค่

  • Sync จะ implement ใน method ที่ชื่อว่า writeและเอา chunk ออกจากคิว(:queue) เลย
  • Async จะ implement ใน method ที่ชื่อว่า try_write แต่จะมีการเซ็ตค่าเวลาใน chunk ว่าจะให้หมดอายุตอนไหน แล้วจะมีตัวคอยเช็คอยู่ตลดว่า expire หรือยัง ถ้า expire แล้วก็เอา chunk นี้ออก

พอรวม :stage และ :queue นี่คือทั้งหมดของ buffer ทีมี แต่จะต้องไม่เกิน total_limit_size ที่เรา config ได้ มิฉะนั้นจะเกิด BufferOverflowError

การ handle BufferOverflowError ผมก็คิดว่าสำคัญ มันจะสามารถตั้งค่า overflow_action ได้ 3 รูปแบบคือ

  • throw_exception คือ raise exception ขึ้นมาเลย ไม่ให้เอาของเขาใน buffer แล้ว อันนี้เข้าไม่ได้คือหายเลยนะ
  • block คือ ไปหยุดให้ input หยุดส่ง data มาก่อน รอ buffer พร้อมแล้วค่อยส่งมา
  • drop_oldest_chunk คือ เอา chunk ที่เก่าสุดออก(หายเหมือนกัน) แล้วค่อยเอาของเข้า buffer

ถ้าไม่ติดปัญหาอะไรจากนี้แล้ว ก็จะไปถึงส่วนของการทำงานใน write หรือ process ก็ขึ้นอยู่กับว่าเป็น sync หรือ async buffer

Kafka plugin

จริงๆ ผมไม่ค่อยเห็นอะไรสำคัญในนี้เท่าไหร่ จากคุณสมบัติมัน มีการ implement ด้วย write ก็คือต้องมีการใช้งานระบบ buffer ที่เหลือก็มีการสร้าง kafka connection เพื่อไว้ produce หา kafka brokers ต่อไป

สรุป

ตามภาพเลยครับ ลองสรุปให้ได้ใน 1 flow จาก input ถึง output

Fluentd Plugin API overview

ปัญหา!!!

อาจจะเจ็บมาไม่เยอะ แต่ก็เจ็บครับ 5555

เร่ิมจากฝั่ง input ก่อน คือ ปัญหาที่ peak ที่เคยเจอคือ เจอ log 1 บรรทัดที่ยาวมากๆ จนมัน parse ไม่ได้ ซึ่งในฝั่ง tail plugin เนี่ย มันจะทะยอยๆ อ่านจาก io จาก log file บนเครื่อง server มาทีละ 8192 ตัวอักษร ซึ่งตรงนี้มัน hard-code ไว้ แต่ไม่ใช่ปัญหาอะไร ปัญหามันคือมันพยายามจะหา end-of-line character(\n) แต่มันหาไม่เจอ!!! มันก็วน loop ไปเรื่อยๆ จน memory เต็ม ติด limit ของ k8s ที่ตั้งไว้ pod ก็ restart ตัวเอง ซึ่งตอนนี้ก็ยังแก้อะไรไม่ได้ ต้องไปปรับ log ใน 1 บรรทัดไม่ให้มีขนาดใหญ่เกินไป

ฝั่ง buffer ก็ยังมี แต่จริงๆ ก็ปนๆ กันกับฝั่ง output เหมือนกัน คือ เซ็ต total_limit_size ไว้น้อยเกินไป ไม่พอตอน peak time คือฝั่ง input มันก็อ่านของมันแล้วก็ emit เข้ามาเรื่อยๆ แต่ดันเอาออกจาก buffer ไม่ทัน จนเกิด BufferOverflowError มาเจอต่อว่า overflow_action ตั้งค่าเป็น default ไว้คือ throw_exception คือใส่มาใน buffer ไม่ได้ เลยหายไปเลยนะ เลยปรับให้เป็น block แทน

ทีนี้ ปัญหาต่อมาคือมันเอาของออกช้า เลยต้องปรับค่าเพิ่มอีก 2 ตัวที่ขา output คือเพิ่ม flush_thread_count แต่ก็ยังไม่พอ ต้องเพิ่มขนาดของ queued_chunks_limit_size ด้วย หน้าที่ของมันคือ ตอนที่เอา chunk มาเข้าคิวให้เพื่อให้ output thread มาอ่าน มันต่อคิวได้ตามจำนวนของ queueud_chunks_limit_size

ค่าที่สมควรจะเป็นคือ ให้เท่ากับ flush_thread_count เลย ตัว thread ที่สร้างไว้จาก flush_thread_count จะได้ไม่มีตัวว่างงาน

แผนในอนาคต

ตอนนี้คือมันยัง scale ได้อีกครับ สิ่งที่จะทำคือ multi-worker ซึ่งท่าปัจจุบันที่ใช้ตอนนี้คือ เป็นแบบ single worker ซึ่งปัญหาที่จะเจอคือถ้าแยกเป็น process หลายๆ ตัว ด้วยค่า configuration เดิม มันจะอ่าน log file ที่ซ้ำกัน เพราะฉะนั้น จะต้องแยกกำหนดให้เป็นประมาณว่า process ตัวแรก อ่านไฟล์ที่ขึ้นต้นด้วย a-l อีก process อ่านไฟล์ที่ขึ้นต้นด้วย m-z อะไรประมาณนี้ครับ แล้วแต่ละ process ก็จะมี path ตามที่ผมเล่ามาตั้งแต่ initial input เลย

ไว้ลอง scale แล้วจะมาแชร์ตัวเลขครับ ว่า improvement เป็นยังไงบ้าง ในเว็บของ fluentd เอง ก็ claim ไว้ว่าได้ถึง 5,000 messages/sec เลย แต่ของผมอาจจะไม่ถึงครับ เพราะ parser และ grouping เยอะเหลือเกิน 555

ปล. ข้อมูลส่วนใหญ่ก็เอามาจาก source repository ของ fluentd และ document นั่นแหละครับ

--

--

Neng Liangpornrattana

A data plumber, basketballer, workout addicted, dog and cat lover