Rails(4.1)中的缓存管理

复杂业务场景下的缓存淘汰方案

Posted by Archfish on 2018-08-12

在rails中我们常用的有Rails#cacheIdentityCache这两种cache方式。在长期实践中发现了不少问题,于是我们研发了一个新的缓存组件CacheWorker。下面对这些组件进行大致介绍:

IdentityCache

这是一种ActiveRecord Caching组件,提供了若干读方法,通过after_commit回调对缓存进行淘汰,我们业务系统中使用的是Redis作为后端默认情况下该缓存无过期时间。 在实际使用中,比直接查询数据库大概快30%左右。如果数据没有通过统一rails应用进行更新,则会导致脏数据,所以这种方式在后期微服务拆分中会带来很大的问题。表的一行记录可能会很大,当使用的热数据很多时,对redis是个不小的压力,所以在redis中存放这种缓存是否真的划算我表示怀疑。

Rails Cache

这种缓存方式支持更灵活的缓存key定义,前期在系统中大量使用。根据查询结果中的最新更新时间作为cache key的条件之一实现在这部分数据发生变化时自动使用新的cache key,从而实现缓存的淘汰。就算不是通过同一Rails应用修改数据也能做到最大程度降低脏缓存出现的几率。

records     = Table.where(aa: :bb)
last_update = records.maximum(:updated_at)
count       = records.size
cache_key   = "#{prefix}:using_for:#{count}:#{last_update.to_i}"
cache_opts  = {
  expires_in: 15.minutes
}
result =  Rails.cache.fetch(cache_key, cache_opts) do
  logic
  ...
end

这种方式有几个问题:

  • 当records的查询条件发生变化时,可能会导致在小于max(update_at)的范围内增加或减少记录从而导致缓存变脏。

    要解决这种问题,只能在算法发生变化时主动删除相关缓存,但是靠人为去记忆哪些数据需要删除就很困难了,大项目中有谁敢打包票对每一处细节都非常熟悉呢?这时只能尽量降低缓存的生存时间,从而降低脏缓存存在的时间,但是某些情况下脏缓存会带来灾难性的后果。

  • 为了确定数据是否有发生变化额外增加两次SQL查询。

    随着数据库中的数据越来越多,这两个SQL操作的成本就越不能忽略。

  • 缓存的量达到一点程度时,主动清空缓存变成了负担。

    在使用中我们发现,当key总数在百万级别时,执行Rails.cache.clear几乎不可能成功,表现为执行了很长时间以后就会报systemstackerror错误。这是因为一次性从redis里拉回太多数据导致的,对数据分批后问题解决。

    # 分批清理Rails.cache缓存
    # NOTE Rails.cache 键值达到一定程度时会出现 `systemstackerror`
    #
    # options
    #   match String  匹配字符串
    #   count Integer 每批清理数目
    def wipe_all_cache!(match: '*', count: 10240)
      rails_options = Rails.cache.options
      redis = Redis.new(rails_options)
      match_str = "#{rails_options[:namespace]}:#{match}"
    
      cursor = '0'
      cleared_count = 0
      loop do
        cursor, keys = redis.scan(cursor, match: match_str, count: count)
    
        (cursor == '0' ? break : next) if keys.blank?
    
        redis.del(*keys)
        cleared_count += keys.count
      end
    
      cleared_count
    end
    

Cache Worker

该组件由缓存注册和事件处理两个部分组成。在初期版本中,我尝试在淘汰逻辑中删除redis中对应的key,但是Redis不支持设置Set中的key的过期时间,这就会导致长时间运行下某些Set中的key总数非常大,进行删除时会导致redis处理超时。后来通过设置比较短的TTL,通过修改model对应的一串key实现缓存淘汰。同时增加一个redis队列用于顺序处理过期事件。数据结构如下所示:

{
  model1_name: SecureRandom.urlsafe_base64(5),
  model2_name: SecureRandom.urlsafe_base64(5),
  model3_name: SecureRandom.urlsafe_base64(5)
}

queue: [:event1, :event2, :event3, :event4]

缓存注册

  • 相关Model需注册相关事件回调,在回调中发布相关事件

    在所有after_commit之后执行publish_event发布默认数据变更事件,其中还有该次提交所涉及的变化,在publish_event可以对缓存过期的条件进行自定义规则,从而实现更细致的缓存控制。

    def self.included(base)
      base.extend ClassMethods
      base.class_eval do
        after_commit :publish_event
      end
    end
    
    private
    def publish_event
      event_type = case
      when transaction_include_any_action?([:create])
        'create'
      when transaction_include_any_action?([:update])
        'update'
      when transaction_include_any_action?([:destroy])
        'destroy'
      end
    
      payload = {
        event_type: event_type,
        changed_attributes: self.previous_changes.keys,
        class: self.class.name,
        primary_key: self.id
      }
    
      ActiveSupport::Notifications.instrument(self.class.event_name, payload)
    end
    

    这里可以更进一步,对于update_alldelete_all也增加了一个回调,这样就完成了所有数据变更操作都有发布信息。

    # hack update_all 和 delete_all 同样发出消息
    module ::ActiveRecord
      class Relation
        alias_method :orig_update_all, :update_all
        alias_method :orig_delete_all, :delete_all
    
        def update_all(updates)
          result = orig_update_all(updates)
          if respond_to? :publish_event
            publish_event(__method__)
          end
          result
        end
    
        def delete_all(conditions = nil)
          result = orig_delete_all(conditions)
          if respond_to? :publish_event
            publish_event(__method__)
          end
          result
        end
      end
    end
    
  • 缓存Key注册

    在Rails#cache的基础上去掉updated_atsize后通过将相关model的当前序列拼接到后面。

    # options
    #   expires_in Integer second
    #   related_modules Array ActiveRecord class
    def cache(cache_key, options = {}, &block)
      if cache_key.blank? || Rails.env.test?
        return yield if block_given?
      end
    
      cache_options = (options || {}).slice(:expires_in, :compress, :race_condition_ttl)
      # 过期冗余,允许key在过期6秒内继续读取
      cache_options[:race_condition_ttl] ||= 6.seconds
    
      related_modules = Array(options[:related_modules] || [])
      # NOTE 默认有效期为3小时,键过期后会一直存在,只能依赖LRU清除,Redis须配置一个合适的Hz值
      if related_modules.blank? || cache_options[:expires_in].blank?
        cache_options[:expires_in] ||= 3.hours
      end
    
      cache_key = Reocar::CacheWorker.register_cache(cache_key, *related_modules)
    
      Rails.cache.fetch(cache_key, cache_options) do
        yield if block_given?
      end
    end
    
    def related_module_identity(module_names)
      module_names = wash(module_names)
      identity_hash = current_identity(module_names)
    
      identity_hash.each_pair.map do |k, v|
        "#{k}.#{v}"
      end.join(':').hexdigest
    end
    
    def update_module_identity!(module_names)
      module_names = wash(module_names)
      hash = current_identity(module_names)
    
      module_names.each do |k|
        cache_version = ''
        loop do
          cache_version = SecureRandom.urlsafe_base64(5)
          break if cache_version != (hash[k] || '')
        end
        hash[k] = cache_version
      end
    
      redis_client.mapped_hmset(self, hash)
    end
    
    def current_identity(module_names)
      module_names = wash(module_names)
    
      redis_client.mapped_hmget(self, *module_names)
    end
    
    def update_module_identity!(module_names)
      module_names = wash(module_names)
      hash = current_identity(module_names)
    
      module_names.each do |k|
        cache_version = ''
        loop do
          cache_version = SecureRandom.urlsafe_base64(5)
          break if cache_version != (hash[k] || '')
        end
        hash[k] = cache_version
      end
    
      redis_client.mapped_hmset(self, hash)
    end
    
    
    def wash(module_names)
      module_names = module_names.is_a?(Array) ? module_names : Array(module_names)
      module_names.map(&:to_s).sort
    end
    
    # 用法
    cache_key = 'current_key_name'
    cached_data = cache(cache_key, related_modules: [Model1, Model2]) do
      logic
      ...
    end
    

事件处理

  • 监听相关事件

    先注册事件监听,在config/initializers中增加subscribe_notification.rb文件,并增加以下内容:

    ActiveSupport::Notifications.subscribe(/^#{Reocar::Publisher::COMMIT_EVENT_PREFIX}/) do |*args|
      Reocar::CacheWorker.push(*args)
    end
    

    这样我们就可以监听到model发出的消息了。

  • 淘汰缓存

    监听到事件以后需要及时对缓存进行淘汰,由于我们还没迁移至sidekiq,所以决定启动一个专门处理该事件的工作线程。

    module Reocar
      module CacheWorker
        def push(*args)
          event = ActiveSupport::Notifications::Event.new(*args)
    
          queue_push(event.payload[:class])
          if worker.stop? && worker.status != 'sleep'
            Thread.current[:cw_thread] = nil
            worker
          end
        end
    
        def worker
          Thread.current[:cw_thread] ||= Thread.new do
            while args = queue_pop
              wipe_cache(args)
            end
          end
        end
    
        def wipe_cache(module_name)
          return if module_name.blank?
    
          update_module_identity!(module_name)
        end
      end
    end
    

这个方案对缓存的管理是比较粗旷的,对redis操作频率不高但是内存消耗比精细管理要高一些。在使用中很容易就可以找出当前缓存的关联model,通过按model定义publish_event即可实现更精细的过期控制。上面给出的只是一个比较初级的版本,顺着这个思路应该可以实现更简易的缓存管理组件。

本文只在做数据库读写分离前适用,读写分离后缓存管理要复杂得多,这里不做讨论。


欢迎跟我交流 Archfish