<ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>


      [譯]使用golang每分鐘處理百萬請求


      在Malwarebytes,我們正在經(jīng)歷驚人的增長,自從我在1年前加入硅谷的這家公司以來,我的主要職責(zé)是為多個(gè)系統(tǒng)做架構(gòu)和開發(fā),為這家安全公司的快速發(fā)展以及百萬日活產(chǎn)品所必需的基礎(chǔ)設(shè)施提供支持。我曾在一些不同的公司從事反病毒和反惡意軟件行業(yè)超過12年,我知道這些系統(tǒng)最終會因?yàn)槲覀兠刻焯幚淼拇罅繑?shù)據(jù)而變得十分復(fù)雜。

      有趣的是,在過去9年左右的時(shí)間里,我所參與的所有Web后端的開發(fā)工作大部分都是在Rails <https://github.com/rails/rails>
      框架的基礎(chǔ)上用ruby實(shí)現(xiàn)的。不要誤解我的意思,雖然我很喜歡Rails框架和Ruby,我相信這是一個(gè)會讓你感到驚嘆的環(huán)境,但一段時(shí)間之后,你就會用ruby的方式來進(jìn)行思考和設(shè)計(jì)系統(tǒng),卻忘記了本來可以利用多線程,并行化,快速執(zhí)行和小內(nèi)存開銷來使你的軟件架構(gòu)變得如此高效和簡單。作為一個(gè)多年的C
      / C ++,Delphi和C#開發(fā)人員,我也同樣開始意識到了如何在工作中使用正確的工具來降低事情的復(fù)雜度。


      作為首席架構(gòu)師,我不太重視對互聯(lián)網(wǎng)所進(jìn)行的語言和框架之爭。我相信軟件的效率(efficiency),生產(chǎn)力(productivity)和代碼可維護(hù)性主要取決于你構(gòu)建解決方案的簡單程度。

      問題

      在構(gòu)建我們的匿名檢測和分析系統(tǒng)時(shí),我們的目標(biāo)是能夠處理來自數(shù)百萬個(gè)端點(diǎn)的的大量POST請求。
      Web處理程序?qū)⒔邮找粋€(gè)JSON文檔,該文檔可能包含了需要寫入Amazon
      S3(注:這個(gè)是亞馬遜的云計(jì)算服務(wù)平臺)的許多負(fù)載(payload)的集合,以便我們的map-reduce系統(tǒng)稍后對這些數(shù)據(jù)進(jìn)行處理。

      從傳統(tǒng)上來說,我們會考慮利用以下工具(基本都是開源的)創(chuàng)建一個(gè)工作層架構(gòu):

      * Sidekiq <https://github.com/mperham/sidekiq>
      * Resque <https://github.com/resque/resque>
      * DelayedJob <https://github.com/collectiveidea/delayed_job/>
      * Elasticbeanstalk Worker Tier
      * RabbitMQ <https://www.rabbitmq.com/tutorials/tutorial-one-ruby.html>
      * and so on…
      然后創(chuàng)建2個(gè)不同的集群,一個(gè)用于Web前端,另一個(gè)用于后臺工作的處理,以擴(kuò)展可以處理的后臺工作的數(shù)量。


      但是從一開始,我們的團(tuán)隊(duì)就知道我們應(yīng)該使用go語言進(jìn)行開發(fā),因?yàn)樵谟懻撾A段我們就意識到了這可能是一個(gè)吞吐量非常大的系統(tǒng)。我使用go大概有2年左右的時(shí)間,我們也開發(fā)了一些系統(tǒng),但是沒有一個(gè)系統(tǒng)有如此大的吞吐量。

      我們開始創(chuàng)建了一些結(jié)構(gòu)體,用于定義通過POST調(diào)用獲取的網(wǎng)絡(luò)請求負(fù)載(payload),同時(shí)定義了一個(gè)方法將這些負(fù)載上傳到S3。
      type PayloadCollection struct { WindowsVersion string `json:"version"` Token
      string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct {
      // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder
      method ensures that there are no name collision in // case we get same
      timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder,
      time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr :=
      json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } //
      Everything we post to the S3 bucket should be marked 'private' var acl =
      s3.Private var contentType = "application/octet-stream" return
      bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl,
      s3.Options{}) }
      從原生方法到Go協(xié)程

      一開始我們用非常原生的方法來實(shí)現(xiàn)POST句柄,嘗試通過使用一個(gè)簡單的協(xié)程來將作業(yè)的處理并行化:
      func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method !=
      "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body
      into a string for json decoding var content = &PayloadCollection{} err :=
      json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err !=
      nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8")
      w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and
      queue items individually to be posted to S3 for _, payload := range
      content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS }
      w.WriteHeader(http.StatusOK) }

      對于適當(dāng)?shù)妮d荷,上面的方法在大多數(shù)情況下能夠工作,但是在大規(guī)模載荷的情況下,上面的方法很快被證明不能夠發(fā)揮很好的作用。我們想像會有很多的請求,但是當(dāng)部署第一個(gè)版本到生產(chǎn)環(huán)境時(shí),沒有想到會有如此的量級。我們完全低估了載荷的數(shù)量。

      上述方法在幾個(gè)方面都很糟糕。它沒有辦法控制產(chǎn)生的go協(xié)程數(shù)量。既然我們每分鐘會收到百萬請求,這段代碼會很快崩潰。

      再試一次


      我們需要尋找別的方法。開始我們就討論了我們需要保持請求句柄的生命周期的短暫性以及請求的處理要在后臺進(jìn)行。當(dāng)然,這是在Rails的世界中用Ruby必須要做到的,否則會阻塞所有能用的web工作處理器,不論你是使用puma,unicorn或者passenger。然后我們需要利用常見的解決方案來做到這一點(diǎn),例如Resque,Sidekiq,SQS等。當(dāng)然還有很多其它方法也能做到這一點(diǎn),

      所以第二次迭代中我們會創(chuàng)建一個(gè)緩沖通道(buffered
      channel),作業(yè)可以插入到緩沖通道中并將作業(yè)的負(fù)載上傳到S3,因?yàn)槲覀兛梢钥刂凭彌_通道中元素的數(shù)量,并且我們有大量的內(nèi)存將作業(yè)插入緩沖通道,我們認(rèn)為這種方法是沒有任何問題的。
      var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) }
      func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through
      each payload and queue items individually to be posted to S3 for _, payload :=
      range content.Payloads { Queue <- payload } ... }
      然后從緩沖通道中取出作業(yè)進(jìn)行處理,像下面這樣:
      func StartProcessor() { for { select { case job := <-Queue:
      job.payload.UploadToS3() // <-- STILL NOT GOOD } } }

      說實(shí)話,我不知道我們在想什么。這個(gè)夜晚注定是用紅牛度過的。這種方法并沒有給我們帶來任何好處,我們用緩沖隊(duì)列代替了有缺陷的并發(fā),但這只是推遲了問題。我們的同步處理器一次只向S3上傳一個(gè)有效載荷(payload),由于傳入請求的速率遠(yuǎn)遠(yuǎn)大于單個(gè)處理器上傳到S3的能力,我們的緩沖通道很快就達(dá)到了極限并阻止了請求句柄可以添加更多作業(yè)的能力。

      我們只是避免了這個(gè)問題,系統(tǒng)的死期最終也進(jìn)入了倒計(jì)時(shí)。在我們部署這個(gè)有缺陷的版本幾分鐘后,延遲率會以固定的速率增加。



      更好的解決方案

      為了創(chuàng)建一個(gè)2層的channel系統(tǒng),我們決定使用一個(gè)通用模式,一個(gè)用來插入作業(yè),一個(gè)用來控制作業(yè)隊(duì)列上同時(shí)運(yùn)行的工作協(xié)程。

      我們的想法是將并行上傳穩(wěn)定在一個(gè)可持續(xù)的速率,這不會削弱機(jī)器的性能,也不會產(chǎn)生到S3的連接錯(cuò)誤。所以我們選擇創(chuàng)建了一個(gè)Job /
      Worker模式。對于熟悉Java,C#等的人來說,想像一下如何以golang的方式用channel來實(shí)現(xiàn)一個(gè)worker線程池。
      var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") )
      // Job represents the job to be run type Job struct { Payload Payload } // A
      buffered channel that we can send work requests on. var JobQueue chan Job //
      Worker represents the worker that executes the job type Worker struct {
      WorkerPool chan chan Job JobChannel chan Job quit chan bool } func
      NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool:
      workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start
      method starts the run loop for the worker, listening for a quit channel in //
      case we need to stop it func (w Worker) Start() { go func() { for { // register
      the current worker into the worker queue. w.WorkerPool <- w.JobChannel select {
      case job := <-w.JobChannel: // we have received a work request. if err :=
      job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s",
      err.Error()) } case <-w.quit: // we have received a signal to stop return } }
      }() } // Stop signals the worker to stop listening for work requests. func (w
      Worker) Stop() { go func() { w.quit <- true }() }
      我們修改了Web請求句柄,創(chuàng)建一個(gè)帶有負(fù)載的Job結(jié)構(gòu)體實(shí)例,并將其發(fā)送到JobQueue channel中以供workers獲取。
      func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method !=
      "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body
      into a string for json decoding var content = &PayloadCollection{} err :=
      json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err !=
      nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8")
      w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and
      queue items individually to be posted to S3 for _, payload := range
      content.Payloads { // let's create a job with the payload work := Job{Payload:
      payload} // Push the work onto the queue. JobQueue <- work }
      w.WriteHeader(http.StatusOK) }
      在我們的Web服務(wù)器初始化期間,我們創(chuàng)建了一個(gè)Dispatcher并調(diào)用Run()來創(chuàng)建works pool并開始偵聽即將出現(xiàn)在JobQueue中的作業(yè)。
      dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
      下面是dispatcher 的實(shí)現(xiàn)代碼:
      type Dispatcher struct { // A pool of workers channels that are registered
      with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers
      int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return
      &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n
      number of workers for i := 0; i < d.maxWorkers; i++ { worker :=
      NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher)
      dispatch() { for { select { case job := <-JobQueue: // a job request has been
      received go func(job Job) { // try to obtain a worker job channel that is
      available. // this will block until a worker is idle jobChannel :=
      <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job
      }(job) } } }
      注意,我們提供了添加到works pool 中的works 最大數(shù)量。既然我們在工程中使用了帶有容器化Go環(huán)境的Amazon
      Elasticbeanstalk,我們就會嘗試一直遵循12-factor <https://12factor.net>
      方法論來在生產(chǎn)環(huán)境中配置我們的系統(tǒng)。因此我們會從環(huán)境變量中讀取這些值。這樣我們就可以控制JobQueue的work數(shù)量,調(diào)整這些值后可以快速生效而無需重新部署集群。
      var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") )
      實(shí)時(shí)結(jié)果

      在我們部署新代碼之后,我們立即看到延遲率下降到了很小的數(shù)值,并且處理請求的能力急劇上升。




      在我們的Elastic負(fù)載均衡完全預(yù)熱幾分鐘之后,我們看到我們的ElasticBeanstalk應(yīng)用程序每分鐘處理了近100萬個(gè)請求。并且在早上的幾個(gè)小時(shí),請求流量飆升到了每分百萬之上。

      服務(wù)器的使用數(shù)量從100臺下降到了大概20臺。



      在我們正確配置了群集和自動擴(kuò)展功能之后,實(shí)例數(shù)量降到了4x EC2
      c4.Large(沒看懂,大概是這個(gè)意思),自動縮放配置好之后,只有CPU使用率超過90%并且維持5分鐘,才會產(chǎn)生一個(gè)新的實(shí)例。


      結(jié)論


      我信奉簡單致勝。我們原本設(shè)計(jì)了一個(gè)使用大量隊(duì)列和后臺wokers并且部署復(fù)雜的系統(tǒng),但我們決定使用Elasticbeanstalk的自動擴(kuò)展能力以及Golang為我們提供開箱即用的高效和簡單的并發(fā)方法。


      你總能為你的工作找到正確的工具。有時(shí)候在你的ruby系統(tǒng)中需要一個(gè)強(qiáng)大的web處理器,請考慮一下Ruby之外的生態(tài)系統(tǒng),你可以獲得更簡單但更強(qiáng)大的替代解決方案。

      原文鏈接
      <http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/>

      友情鏈接
      ioDraw流程圖
      API參考文檔
      OK工具箱
      云服務(wù)器優(yōu)惠
      阿里云優(yōu)惠券
      騰訊云優(yōu)惠券
      京東云優(yōu)惠券
      站點(diǎn)信息
      問題反饋
      郵箱:[email protected]
      QQ群:637538335
      關(guān)注微信

        <ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>
          男女互操软件 | 欧美黄色录像大全 | 男生的鸡插入女生的鸡 | www.激情 | 欧美性猛交XXXX乱大交3蜜豆 | 久草视频在线播放 | 日日干欧美 | 亚洲无码成人网站 | 天干夜夜爽爽日日日日 | 亚洲国产成人无码影视 |