dot Become a Redis expert.

See what's new in Redis University

6.6.2 Sending files

back to home

6.6.2 Sending files

In order to get the log data to our logs processors, we’ll have two different components operating on the data. The first is a script that will be taking the log files and putting them in Redis under named keys, publishing the names of the keys to a chat channel using our group chat method from section 6.5.2, and waiting for notification when they’re complete (to not use more memory than our Redis machine has). It’ll be waiting for a notification that a key with a name similar to the file stored in Redis has a value equal to 10, which is our number of aggregation processes. The function that copies logs and cleans up after itself is shown in the following listing.

Listing 6.30 The copy_logs_to_redis() function
def copy_logs_to_redis(conn, path, channel, count=10,
                     limit=2**30, quit_when_done=True):
   bytes_in_redis = 0
   waiting = deque()
 
   create_chat(conn, 'source', map(str, range(count)), '', channel)

Create the chat that will be used to send messages to clients.

   count = str(count)
 
   for logfile in sorted(os.listdir(path)):

Iterate over all of the log files.

      full_path = os.path.join(path, logfile)

 
      fsize = os.stat(full_path).st_size
 
      while bytes_in_redis + fsize > limit:
         cleaned = _clean(conn, channel, waiting, count)
         if cleaned:
            bytes_in_redis -= cleaned
      else:
            time.sleep(.25)

Clean out finished files if we need more room.

   with open(full_path, 'rb') as inp:
      block = ' '
      while block:
         block = inp.read(2**17)
         conn.append(channel+logfile, block)

Upload the file to Redis.

   send_message(conn, channel, 'source', logfile)

Notify the listeners that the file is ready.

   bytes_in_redis += fsize
   waiting.append((logfile, fsize))

Update our local information about Redis’ memory use.

if quit_when_done:
   send_message(conn, channel, 'source', ':done')

We are out of files, so signal that it’s done.

while waiting:
   cleaned = _clean(conn, channel, waiting, count)
   if cleaned:
      bytes_in_redis -= cleaned
   else:
      time.sleep(.25)

Clean out finished files if we need more room.

def _clean(conn, channel, waiting, count):
   if not waiting:
      return 0
   w0 = waiting[0][0]
   if conn.get(channel + w0 + ':done') == count:
      conn.delete(channel + w0, channel + w0 + ':done')
      return waiting.popleft()[1]
   return 0

How we actually perform the cleanup from Redis.

 

Copying logs to Redis requires a lot of detailed steps, mostly involving being careful to not put too much data into Redis at one time and properly cleaning up after ourselves when a file has been read by all clients. The actual aspect of notifying logs processors that there’s a new file ready is easy, but the setup, sending, and cleanup are pretty detailed.