Python/FAQ/Обработка файлов

Материал из Wiki.crossplatform.ru

Перейти к: навигация, поиск
· Python ·

Содержание

[править] Introduction

#-----------------------------
for line in DATAFILE:
    line = line.rstrip()
    size = len(line)
    print size        # output size of line
 
 
#-----------------------------
for line in datafile:
    print length(line.rstrip())     # output size of line
#-----------------------------
lines = datafile.readlines()
 
#-----------------------------
whole_file = myfile.read()
#-----------------------------
## No direct equivalent in Python
#% perl -040 -e '$word = <>; print "First word is $word\n";'
#-----------------------------
## No direct equivalent in Python
#% perl -ne 'BEGIN { $/="%%\n" } chomp; print if /Unix/i' fortune.dat
#-----------------------------
 
print>>myfile, "One", "two", "three"  # "One two three"
print "Baa baa black sheep."         # Sent to default output file
#-----------------------------
 
buffer = myfile.read(4096)
rv = len(buffer)
#-----------------------------
myfile.truncate(length)
open("/tmp/%d.pid" % os.getpid(), "a").truncate(length)
#-----------------------------
 
pos = myfile.tell()
print "I'm", pos, "bytes from the start of DATAFILE."
#-----------------------------
logfile.seek(0, 2)   # Seek to the end
datafile.seek(pos)   # Seek to a given byte
outfile.seek(-20, 1) # Seek back 20 bytes
 
#-----------------------------
written = os.write(datafile.fileno(), mystr)
if written != len(mystr):
    warnings.warn("only read %s bytes, not %s" % (written, len(mystr)))
#-----------------------------
pos = os.lseek(myfile.fileno(), 0, 1)       # don't change position
 
#-----------------------------

[править] Reading Lines with Continuation Characters

def ContReader(infile):
    lines = []
    for line in infile:
        line = line.rstrip()
        if line.endswith("\\"):
            lines.append(line[:-1])
            continue
 
        lines.append(line)
        yield "".join(lines)
        lines = []
    if lines:
        yield "".join(lines)
 
for line in ContReader(datafile):
    pass # process full record in 'line' here

[править] Counting Lines (or Paragraphs or Records) in a File

import os
count = int(os.popen("wc -l < " + filename).read())
#-----------------------------
for count, line in enumerate(open(filename)):
    pass
 
count += 1  # indexing is zero based
#-----------------------------
myfile = open(filename)
count = 0
for line in myfile:
    count += 1
# 'count' now holds the number of lines read
#-----------------------------
 
myfile = open(filename)
count = 0
while True:
    line = myfile.readline()
    if not line:
        break
    count += 1
#-----------------------------
 
count = 0
while True:
    s = myfile.read(2**16)
    count += s.count("\n")
#-----------------------------
for line, count in zip(open(filename), xrange(1, sys.maxint)):
    pass
 
# 'count' now holds the number of lines read
#-----------------------------
import fileinput
fi = fileinput.FileInput(filename)
while fi.readline(): pass
 
count = fi.lineno()
#-----------------------------
def SepReader(infile, sep = "\n\n"):
    text = infile.read(10000)
    if not text:
        return
 
    while True:
        fields = text.split(sep)
        for field in fields[:-1]:
            yield field
        text = fields[-1]
        new_text = infile.read(10000)
        if not new_text:
            yield text
            break
 
        text += new_text
 
para_count = 0
for para in SepReader(open(filename)):
    para_count += 1
# FIXME: For my test case (Python-pre2.2 README from CVS) this
# returns 175 paragraphs while Perl returns 174.
#-----------------------------

[править] Processing Every Word in a File

for line in sys.stdin:
    for word in line.split():
        pass # do something with 'chunk'
 
#-----------------------------
pat = re.compile(r"(\w[\w'-]*)")
for line in sys.stdin:
    pos = 0
    while True:
        match = pat.search(line, pos)
        if not match:
            break
 
        pos = match.end(1)
        # do something with match.group(1)
 
# EXPERIMENTAL in the sre implementation but
# likely to be included in future (post-2.2) releases.
pat = re.compile(r"(\w[\w'-]*)")
for line in sys.stdin:
    scanner = pat.scanner(line)
    while True:
        match = scanner.search()
        if not match:
            break
 
        # do something with match.group(1)
 
 
#-----------------------------
# Make a word frequency count
import fileinput, re
pat = re.compile(r"(\w[\w'-]*)")
seen = {}
for line in fileinput.input():
    pos = 0
    while True:
        match = pat.search(line, pos)
        if not match:
            break
 
        pos = match.end(1)
        text = match.group(1).lower()
        seen[text] = seen.get(text, 0) + 1
 
# output dict in a descending numeric sort of its values
for text, count in sorted(seen.items, key=lambda item: item[1]):
    print "%5d %s" % (count, text)
 
 
#-----------------------------
# Line frequency count
import fileinput, sys
seen = {}
for line in fileinput.input():
    text = line.lower()
    seen[text] = seen.get(text, 0) + 1
 
for text, count in sorted(seen.items, key=lambda item: item[1]):
    sys.stdout.write("%5d %s" % (count, text))
 
 
#-----------------------------

[править] Reading a File Backwards by Line or Paragraph

lines = myfile.readlines()
while lines:
    line = lines.pop()
    # do something with 'line'
 
#-----------------------------
for line in reversed(myfile):
    pass  # do something with line
 
#-----------------------------
for i in range(len(lines)):
    line = lines[-i]
#-----------------------------
for paragraph in sorted(SepReader(infile)):
    pass # do something
 
#-----------------------------

[править] Trailing a Growing File

import time
while True:
    for line in infile:
        pass # do something with the line
 
    time.sleep(SOMETIME)
    infile.seek(0, 1)
#-----------------------------
import time
naptime = 1
 
logfile = open("/tmp/logfile")
while True:
    for line in logfile:
        print line.rstrip()
    time.sleep(naptime)
    infile.seek(0, 1)
 
#-----------------------------
while True:
    curpos = logfile.tell()
    while True:
        line = logfile.readline()
        if not line:
            break
 
        curpos = logfile.tell()
    sleep(naptime)
    logfile.seek(curpos, 0)  # seek to where we had been
#-----------------------------
import os
if os.stat(LOGFILENAME).st_nlink == 0:
    raise SystemExit
#-----------------------------

[править] Picking a Random Line from a File

import random, fileinput
text = None
for line in fileinput.input():
    if random.randrange(fileinput.lineno()) == 0:
        text = line
 
# 'text' is the random line
#-----------------------------
# XXX is the perl code correct?  Where is the fortunes file opened?
import sys
adage = None
for i, rec in enumerate(SepReader(open("/usr/share/games/fortunes"), "%\n")):
    if random.randrange(i+1) == 0:
        adage = rec
 
print adage
#-----------------------------

[править] Randomizing All Lines

import random
lines = data.readlines()
random.shuffle(lines)
for line in lines:
    print line.rstrip()
 
#-----------------------------

[править] Reading a Particular Line in a File

# using efficient caching system
import linecache
linecache.getline(filename, DESIRED_LINE_NUMBER)
 
# or doing it more oldskool
lineno = 0
while True:
    line = infile.readline()
    if not line or lineno == DESIRED_LINE_NUMBER:
        break
 
    lineno += 1
#-----------------------------
lines = infile.readlines()
line = lines[DESIRED_LINE_NUMBER]
#-----------------------------
for i in range(DESIRED_LINE_NUMBER):
    line = infile.readline()
    if not line:
        break
 
#-----------------------------
 
## Not sure what this thing is doing.  Allow fast access to a given
## line number?
 
# usage: build_index(*DATA_HANDLE, *INDEX_HANDLE)

[править] Processing Variable-Length Text Fields

# given $RECORD with field separated by PATTERN,
# extract @FIELDS.
fields = re.split(pattern_string, text)
#-----------------------------
 
pat = re.compile(pattern_string)
fields = pat.split(text)
#-----------------------------
re.split(r"([+-])", "3+5-2")
#-----------------------------
[3, '+', 5, '-', 2]
#-----------------------------
 
fields = record.split(":")
#-----------------------------
fields = re.split(r":", record)
#-----------------------------
fields = re.split(r"\s+", record)
#-----------------------------
fields = record.split(" ")
 
#-----------------------------

[править] Removing the Last Line of a File

myfile = open(filename, "r")
prev_pos = pos = 0
while True:
    line = myfile.readline()
    if not line:
        break
 
    prev_pos = pos
    pos = myfile.tell()
myfile = open(filename, "a")
myfile.truncate(prev_pos)
#-----------------------------

[править] Processing Binary Files

open(filename, "rb")
open(filename, "wb")
 
#-----------------------------
gifname = "picture.gif"
gif_file = open(gifname, "rb")
 
# Don't think there's an equivalent for these in Python
#binmode(GIF);               # now DOS won't mangle binary input from GIF
#binmode(STDOUT);            # now DOS won't mangle binary output to STDOUT
 
 
#-----------------------------
while True:
    buff = gif.read(8 * 2**10)
    if not buff:
        break
    sys.stdout.write(buff)
#-----------------------------

[править] Using Random-Access I/O

address = recsize * recno
myfile.seek(address, 0)
buffer = myfile.read(recsize)
#-----------------------------
address = recsize * (recno-1)
#-----------------------------

[править] Updating a Random-Access File

import posixfile
address = recsize * recno
myfile.seek(address)
 
buffer = myfile.read(recsize)
# ... work with the buffer, then turn it back into a string and ...
myfile.seek(-recsize, posixfile.SEEK_CUR)
myfile.write(buffer)
myfile.close()
#-----------------------------
## Not yet implemented
# weekearly -- set someone's login date back a week
# @@INCOMPLETE@@

[править] Reading a String from a Binary File

## Note: this isn't optimal -- the 's+=c' may go O(N**2) so don't
 
## use for large strings.
myfile.seek(addr)
s = ""
while True:
    c = myfile.read(1)
    if not c or c == "\0":
        break
 
    s += c
#-----------------------------
myfile.seek(addr)
offset = 0
while True:
    s = myfile.read(1000)
    x = s.find("\0")
    if x != -1:
        offset += x
        break
    offset += len(s)
    if len(s) != 1000:  # EOF
 
        break
myfile.seek(addr)
s = myfile.read(offset - 1)
myfile.read(1)
 
#-----------------------------
## Not Implemented
# bgets - get a string from an address in a binary file
#-----------------------------
#!/usr/bin/perl
# strings - pull strings out of a binary file
import re, sys
 
 
## Assumes SepReader from above
 
pat = re.compile(r"([\040-\176\s]{4,})")
for block in SepReader(sys.stdin, "\0"):
    pos = 0
    while True:
        match = pat.search(block, pos)
        if not match:
            break
 
        print match.group(1)
        pos = match.end(1)
#-----------------------------

[править] Reading Fixed-Length Records

# RECORDSIZE is the length of a record, in bytes.
# TEMPLATE is the unpack template for the record
# FILE is the file to read from
# FIELDS is a tuple, one element per field
 
import struct
RECORDSIZE= struct.calcsize(TEMPLATE)
while True:
    record = FILE.read(RECORDSIZE):
    if len(record)!=RECORDSIZE:
        raise "short read"
    FIELDS = struct.unpack(TEMPLATE, record)
 
# ----

[править] Reading Configuration Files

# NOTE: to parse INI file, see the stanard ConfigParser module.
import re
pat = re.compile(r"\s*=\s*")
for line in config_file:
    if "#" in line:         # no comments
 
        line = line[:line.index("#")]
    line = line.strip()     # no leading or trailing white
    if not line:            # anything left?
        continue
 
    m = pat.search(line)
    var = line[:m.start()]
    value = line[m.end():]
    User_Preferences[var] = value

[править] Testing a File for Trustworthiness

import os
 
mode, ino, dev, nlink, uid, gid, size, \
atime, mtime, ctime = os.stat(filename)
 
mode &= 07777               # discard file type info
 
#-----------------------------
info = os.stat(filename)
if info.st_uid == 0:
    print "Superuser owns", filename
 
if info.st_atime > info.st_mtime:
    print filename, "has been read since it was written."
#-----------------------------
import os
def is_safe(path):
    info = os.stat(path)
 
    # owner neither superuser nor me 
 
    # the real uid is in stored in the $< variable
    if info.st_uid not in (0, os.getuid()):
        return False
 
    # check whether group or other can write file.
    # use 066 to detect either reading or writing
    if info.st_mode & 022:  # someone else can write this
        if not os.path.isdir(path):  # non-directories aren't safe
 
            return False
        # but directories with the sticky bit (01000) are
        if not (info.st_mode & 01000):
            return False
 
    return True
#-----------------------------
## XXX What is '_PC_CHOWN_RESTRICTED'?
 
def is_verysafe(path):
    terms = []
    while True:
        path, ending = os.path.split(path)
        if not ending:
            break
 
        terms.insert(0, ending)
    for term in terms:
        path = os.path.join(path, term)
        if not is_safe(path):
            return False
 
    return True
#-----------------------------
 
# Program: tctee
# Not Implemented (requires reimplementing Perl's builtin '>>', '|',
# etc. semantics)

[править] Program: tailwtmp

#!/usr/bin/python
 
# tailwtmp - watch for logins and logouts;
# uses linux utmp structure, from /usr/include/bits/utmp.h
 
# /* The structure describing an entry in the user accounting database.  */
# struct utmp
# {
#   short int ut_type;            /* Type of login.  */
#   pid_t ut_pid;                 /* Process ID of login process.  */
#   char ut_line[UT_LINESIZE];    /* Devicename.  */
 
#   char ut_id[4];                /* Inittab ID.  */
#   char ut_user[UT_NAMESIZE];    /* Username.  */
#   char ut_host[UT_HOSTSIZE];    /* Hostname for remote login.  */
#   struct exit_status ut_exit;   /* Exit status of a process marked
#                                    as DEAD_PROCESS.  */
#   long int ut_session;          /* Session ID, used for windowing.  */
#   struct timeval ut_tv;         /* Time entry was made.  */
#   int32_t ut_addr_v6[4];        /* Internet address of remote host.  */
#   char __unused[20];            /* Reserved for future use.  */
 
# };
 
# /* Values for the `ut_type' field of a `struct utmp'.  */
# #define EMPTY       0   /* No valid user accounting information.  */
# 
# #define RUN_LVL     1   /* The system's runlevel.  */
# #define BOOT_TIME   2   /* Time of system boot.  */
# #define NEW_TIME    3   /* Time after system clock changed.  */
# #define OLD_TIME    4   /* Time when system clock changed.  */
 
# 
# #define INIT_PROCESS    5   /* Process spawned by the init process.  */
# #define LOGIN_PROCESS   6   /* Session leader of a logged in user.  */
# #define USER_PROCESS    7   /* Normal process.  */
# #define DEAD_PROCESS    8   /* Terminated process.  */
# 
# #define ACCOUNTING  9
 
import time
 
import struct
import os
 
class WTmpRecord:
    fmt = "hI32s4s32s256siili4l20s";
    _fieldnames = ["type","PID","Line","inittab","User","Hostname",
                    "exit_status", "session", "time", "addr" ]
    def __init__(self):
        self._rec_size = struct.calcsize(self.fmt)
    def size(self):
        return self._rec_size
    def unpack(self, bin_data):
        rec = struct.unpack(self.fmt, bin_data)
        self._rec = []
        for i in range(len(rec)):
            if i in (2,3,4,5):
                # remove character zeros from strings
 
                self._rec.append( rec[i].split("\0")[0] )
            else:
                self._rec.append(rec[i])
        return self._rec
    def fieldnames(self):
        return self._fieldnames
    def __getattr__(self,name):
        return self._rec[self._fieldnames.index(name)]
 
rec = WTmpRecord()
f = open("/var/log/wtmp","rb")
f.seek(0,2)
 
while True:
    while True:
        bin = f.read(rec.size())
        if len(bin) != rec.size():
            break
        rec.unpack(bin)
        if rec.type != 0:
            print " %1d %-8s %-12s %-24s %-20s %5d %08x" % \
                (rec.type, rec.User, rec.Line, 
                 time.strftime("%a %Y-%m-%d %H:%M:%S",time.localtime(rec.time)),
                 rec.Hostname, rec.PID, rec.addr)
    time.sleep(1)
f.close()

[править] Program: tctee

# @@INCOMPLETE@@
 
# @@INCOMPLETE@@

[править] Program: laston

#!/usr/bin/python
# laston - find out when given user last logged on
import sys
import struct
import pwd
import time
 
import re
 
f = open("/var/log/lastlog","rb")
 
fmt = "L32s256s"
rec_size = struct.calcsize(fmt)
 
for user in sys.argv[1:]:
    if re.match(r"^\d+$", user):
        user_id = int(user)
    else:
 
        try:
            user_id = pwd.getpwnam(user)[2]
        except:
            print "no such uid %s" % (user)
            continue
    f.seek(rec_size * user_id)
    bin = f.read(rec_size)
    if len(bin) == rec_size:
        data = struct.unpack(fmt, bin)
        if data[0]:
            logged_in = "at %s" % (time.strftime("%a %H:%M:%S %Y-%m-%d",
                                    time.localtime(data[0])))
            line = " on %s" % (data[1])
            host = " from %s" % (data[2])
        else:
 
            logged_in = "never logged in"
            line = ""
            host = ""
        print "%-8s UID %5d %s%s%s" % (user, user_id, logged_in, line, host)
    else:
 
        print "Read failed."
f.close()