程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python from door to master (V): file processing-01-file i/o

編輯:Python

One 、 File read and write

Three issues need to be paid attention to when reading files :1、with Context ;2、 A newline ;3、 code ( Coding available sys.gefdefaultencoding() Get the system default code ). If you want to bypass the file encoding layer, you can directly access buffer Properties such as sys.stduout.buffer.write();

1.1、 Read write text file

open There are several modes in 【 File format + Open mode 】, File format : Text -t, Binary system -b, Compressed files -t. Open mode :r- read ,w- Write .

file_name
=
'test.txt'
# The default is to read the files in the current directory 

""" Read text """
with open( file_name, 'rt') as f:
f. read()

# The file may exist , So we need to judge in this way
import os
if not os. path. exists( file_name):
with open( file_name, 'wt') as f:
f. write( 'Hello,I am a test.\n')
else:
print( f'File { file_name} already exists!')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

1.2、 Read write binary

# Binary writing 

b = b'Hello World'
print( f'binary object b[0] = { b[ 0]} ')
# Binary reading and writing must be decoded and encoded
with open( 'test.bin', 'rb') as f:
data = f. read( 16)
text = data. decode( 'utf-8')

with open( 'test.bin', 'wb') as f:
text = 'Hello World'
f. write( text. encode( 'utf-8'))

import array
a_obj = array. array( 'i', [ 0, 0, 0, 0, 0, 0, 0, 0])
with open( 'test.bin', 'rb') as f:
# readinto Will operate directly into memory , But this will be platform related , Pay attention to
f. readinto( a_obj)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

1.3、 Read and write compressed files

# gzip compression

import gzip
gz_file, bz_file = "giztext.gz", "bz.gz"
with gzip. open( gz_file, 'rt') as f:
text = f. read()
# bz2 compression
import bz2
with bz2. open( bz_file, 'rt') as f:
text = f. read()
# gzip compression
import gzip
with gzip. open( gz_file, 'wt') as f:
f. write( text)

# bz2 compression
import bz2
with bz2. open( bz_file, 'wt') as f:
f. write( text)
# Set compression level
with gzip. open( gz_file, 'wt', compresslevel = 3) as f:
f. write( text)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.

1.4、 File encoding

import
urllib.
request

import io
# Binary file encoding modification
url_res = urllib. request. urlopen( 'http://www.python.org')
f_test = io. TextIOWrapper( url_res, encoding = 'utf-8')
text_val = f_test. read()

# Modify the encoding of an already opened text mode , First use detach() Clear the current coding layer
import sys
print( f'sys stdout encoding is: { sys. stdout. encoding} ') #utf-8
sys. stdout = io. TextIOWrapper( sys. stdout. detach(), encoding = 'latin-1')
print( f'sys stdout new encoding is: { sys. stdout. encoding} ') #latin-1

#I/O System example , Here is a I/O Complete process
file_read = open( 'sample.txt', 'w')
print( f'file read: { file_read} ') #<_io.TextIOWrapper name='sample.txt' mode='w' encoding='UTF-8'>
print( f'file buffer: { file_read. buffer} ') #<_io.BufferedWriter name='sample.txt'>
print( f'file buffer raw: { file_read. buffer. raw} ') #<_io.FileIO name='sample.txt' mode='wb' closefd=True>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

1.5、 Read fixed length file

from
functools
import
partial


RECORD_SIZE = 32

with open( 'somefile.data', 'rb') as f:
records = iter( partial( f. read, RECORD_SIZE), b'')
for r in records:
pass
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

1.6、 Create temporary file

from
tempfile
import
TemporaryFile

#TemporaryFile: Create an anonymous temporary file , You can't use some of the underlying methods
#NamedTemporaryFile: Create an anonymous temporary file , You can also use some of the underlying methods
with TemporaryFile( 'w+t') as f:
# Read/write to the file
f. write( 'Hello World\n')
f. write( 'Testing\n')

# Seek back to beginning and read the data
f. seek( 0)
data = f. read()

f = TemporaryFile( 'w+t')
# Use the temporary file
f. close()

# ---------------------------------------------------
from tempfile import NamedTemporaryFile

with NamedTemporaryFile( 'w+t') as f:
print( 'filename is:', f. name)
pass

with NamedTemporaryFile( 'w+t', delete = False) as f:
print( 'filename is:', f. name)
pass

# ---------------------------------------------------
from tempfile import TemporaryDirectory
with TemporaryDirectory() as dirname:
print( 'dirname is:', dirname)
# Use the directory #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmp_3lwonjh

import tempfile
print( tempfile. mkstemp()) #(4, '/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmpi_hjdkd0')

print( tempfile. gettempdir()) #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T

f = NamedTemporaryFile( prefix = 'mytemp', suffix = '.txt', dir = '/tmp')
print( f. name) #/tmp/mytempng2rx_bg.txt
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.

1.7、 File descriptor wrapper

A file descriptor is a variable , Used to specify a system I/O passageway , Can pass open() and makefile() Function to wrap , The latter is not as good as the former, but can cross platform . stay unix In the system , You can use this master to create a pipe .

import
os

file_data = os. open( 'test.txt', os. O_WRONLY | os. O_CREAT)

# Turn into a proper file
test_file = open( file_data, 'wt')
test_file. write( 'hello world\n')
test_file. close()


from socket import socket, AF_INET, SOCK_STREAM
def echo_client( client_sock, addr):
print( f'Got connection from { addr} ')

# Make text-mode file wrappers for socket reading/writing
client_in = open( client_sock. fileno(), 'rt', encoding = 'latin-1',
closefd = False)

client_out = open( client_sock. fileno(), 'wt', encoding = 'latin-1',
closefd = False)

# Echo lines back to the client using file I/O
for line in client_in:
client_out. write( line)
client_out. flush()

client_sock. close()

def echo_server( address):
sock = socket( AF_INET, SOCK_STREAM)
sock. bind( address)
sock. listen( 1)
while True:
client, addr = sock. accept()
echo_client( client, addr)


import sys
bstd_out = open( sys. stdout. fileno(), 'wb', closefd = False)
bstd_out. write( b'Hello World\n')
bstd_out. flush()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.

Two 、 File operations

1.1、 route

import
os

csv_path = '/usr/test/Data/test.csv'
print( f'{ csv_path} base name is: { os. path. basename( csv_path)} ') #test.csv
print( f'{ csv_path} dir name is: { os. path. dirname( csv_path)} ') #/usr/test/Data
print( f"new path: { os. path. join( 'tmp', 'data', os. path. basename( csv_path))} ") #tmp/data/test.csv
csv_path = '~/Data/test.csv'
print( f'path expand user is: { os. path. expanduser( csv_path)} ') #/Users/liudong/Data/test.csv
print( f'{ csv_path} splitext is: { os. path. splitext( csv_path)} ') #('~/Data/test', '.csv')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

1.2、 Check

# We need to pay attention to the problem of authority 

import os
file_path = '/etc/passwd'
test_path = '/etc/test'
print( f"is { file_path} exists: { os. path. exists( file_path)} ")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

1.3、 list

import
os

file_path = '/etc'
# List all the files in the folder
name_list = os. listdir( file_path)
print( f'file list of etc is:\n{ name_list} ')

# The following is file filtering
import os. path
dir_name_list = [ name for name in os. listdir( file_path)
if os. path. isdir( os. path. join( file_path, name))]
py_file_list = [ name for name in os. listdir( file_path)
if name. endswith( '.py')]

import os. path
import glob
py_file_list = glob. glob( '*.py')
# Get file sizes and modification dates, Get more file information
name_sz_date = [( name, os. path. getsize( name), os. path. getmtime( name))
for name in py_file_list]
for name, size, mtime in name_sz_date:
print( f'name={ name} , size={ size} , mtime={ mtime} ')

# Alternative: Get file metadata
file_metadata = [( name, os. stat( name)) for name in py_file_list]
for name, meta in file_metadata:
print( name, meta. st_size, meta. st_mtime)
print( f'name={ name} , size={ meta. st_size} , mtime={ meta. st_mtime} ')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.

3、 ... and 、 Memory operation of files

3.1、 Memory mapping of files

import
os

import mmap

def memory_map( file_name, access = mmap. ACCESS_WRITE):
size_val = os. path. getsize( file_name)
fd = os. open( file_name, os. O_RDWR)
return mmap. mmap( fd, size_val, access = access)


size = 1000000
with open( 'test_data', 'wb') as f:
f. seek( size - 1)
f. write( b'\x00')


m = memory_map( 'test_data')
print( f'the len of m is: { len( m)} ') #1000000
print( f'm split: { m[ 0: 10]} ') #b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
print( f'm[0] is: { m[ 0]} ') #0
m[ 0: 11] = b'Hello World'
print( f'close result: { m. close()} ') #None

with open( 'test_data', 'rb') as f:
print( f'read content: { f. read( 11)} ') #b'Hello World'

m = memory_map( 'test_data')
v = memoryview( m). cast( 'I')
v[ 0] = 7
print( f'point content from m is: { m[ 0: 4]} ') #b'\x07\x00\x00\x00'
m[ 0: 4] = b'\x07\x01\x00\x00'
print( f'v[0] = { v[ 0]} ') #263
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

3.2、 Read binary file from buffer

import
os.
path


def read_into_buffer( file_name):
buf = bytearray( os. path. getsize( file_name))
with open( file_name, 'rb') as f:
"""# Used to fill existing cache flushes , and read Is to create a new buffer , Avoid a large number of memory operations ,
One problem is to determine whether the number of bytes read is consistent with the size of the cache , Simply judge whether the read size is consistent with the returned size """
f. readinto( buf)
return buf


with open( 'test_file.bin', 'wb') as f:
f. write( b'Hello World')
buf_read = read_into_buffer( 'test_file.bin')
print( f'buf read is: { buf_read} ')
buf_read[ 0: 5] = b'Hello'
print( f'buf read is: { buf_read} ')
with open( 'new_test_file.bin', 'wb') as f:
f. write( buf_read)


# Size of each record (adjust value)
record_size = 32
""" Judge the size """
buf_read = bytearray( record_size)
with open( 'test_file', 'rb') as f:
while True:
n = f. readinto( buf_read)
if n < record_size:
break


print( f'buf read is: { buf_read} ')
memory_val = memoryview( buf_read) # This is a kind of 0-copy technology
memory_val = memory_val[ - 3:]
print( f'memory value is: { memory_val} ')
memory_val[:] = b'WORLD'
print( f'buf read is: { buf_read} ')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.

3.3、 serialize

import
serial

ser = serial. Serial( '/dev/tty.usbmodem641', # Device name varies
baudrate = 9600,
bytesize = 8,
parity = 'N',
stopbits = 1)


ser. write( b'G1 X50 Y50\r\n')
resp = ser. readline()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
import
pickle

# file load is [1, 6, 3, 9]
# file load is hello,world!
# file load is {'go', 'java', 'python'}
# pickle funciton: b'\x80\x04\x95\x10\x00\x00\x00\x00\x00\x00\x00\x8c\x04math\x94\x8c\x03cos\x94\x93\x94.'
# T-minus is: 30
# T-minus is: 29load result: <__main__.Countdown object at 0x1037da850>
data_obj = ... # Some Python object
test_file = open( 'test_file', 'wb')
pickle. dump( data_obj, test_file) # It and load Is the opposite of the two operations
p_con = pickle. dumps( data_obj)

# Restore from a file
test_file = open( 'test_file', 'rb')
data_obj = pickle. load( test_file)

# Restore from a string
data_obj = pickle. loads( p_con)


import pickle
test_file = open( 'some_data', 'wb')
pickle. dump([ 1, 6, 3, 9], test_file)
pickle. dump( 'hello,world!', test_file)
pickle. dump({ 'python', 'java', 'go'}, test_file)
test_file. close()
test_file = open( 'some_data', 'rb')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')
print( f'file load is { pickle. load( test_file)} ')


import math
import pickle
print( f'pickle funciton: { pickle. dumps( math. cos)} ')


import time
import threading
""" Some objects that depend on the underlying system cannot be serialized , You can actually use getstate and setstate To achieve serialization and deserialization """
class Countdown:
def __init__( self, n):
self. n = n
self. thr = threading. Thread( target = self. run)
self. thr. daemon = True
self. thr. start()

def run( self):
while self. n > 0:
print( f'T-minus is: { self. n} ')
self. n -= 1
time. sleep( 5)

def __getstate__( self):
return self. n

def __setstate__( self, n):
self. __init__( n)


count_down = Countdown( 30)

test_file = open( 'test.p', 'wb')
import pickle
pickle. dump( count_down, test_file)
test_file. close()


test_file = open( 'test.p', 'rb')
print( f'load result: { pickle. load( test_file)} ')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.

Four 、 Encoding and decoding

base64

s_obj
=
b'hello'

import base64

code_obj = base64. b64encode( s_obj)
print( f'b64 encode { s_obj} = { code_obj} ') #b'aGVsbG8='

print( f'decode { code_obj} = { base64. b64decode( code_obj)} ') #b'hello'


code_obj = base64. b64encode( s_obj). decode( 'ascii')
print( f'encode decode { s_obj} = { code_obj} ') #aGVsbG8=
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

0x

s
=
b'hello'

import binascii
h = binascii. b2a_hex( s)
print( f'base: { h} ') #b'68656c6c6f'
print( f'b2a hex: { binascii. a2b_hex( h)} ') #b'hello'


import base64
h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f'b16 decode: { base64. b16decode( h)} ') #b'hello'


h = base64. b16encode( s)
print( f'base: { h} ') #b'68656C6C6F'
print( f"decode: { h. decode( 'ascii')} ") #68656C6C6F
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

5、 ... and 、 Advanced operations

5.1、 Copying and moving of files and directories

import
shutil


#shutil The biggest problem with the library is that the metadata of the file is not fully preserved ;

# Copy src to dst. (cp src dst)
shutil. copy( src, dst)

# Copy files, but preserve metadata (cp -p src dst)
shutil. copy2( src, dst)

# Copy directory tree (cp -R src dst)
shutil. copytree( src, dst)

# Move src to dst (mv src dst)
shutil. move( src, dst)

shutil. copytree( src, dst, symlinks = True)

# Ignore files
def ignore_pyc_files( dirname, filenames):
return [ name in filenames if name. endswith( '.pyc')]
shutil. copytree( src, dst, ignore = ignore_pyc_files)


shutil. copytree( src, dst, ignore = shutil. ignore_patterns( '*~', '*.pyc'))

try:
shutil. copytree( src, dst)
except shutil. Error as e:
for src, dst, msg in e. args[ 0]:
# src is source name
# dst is destination name
# msg is error message from exception
print( dst, src, msg)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
import
os.
path

# Its ratio shutil The advantage of is that the metadata is preserved completely
file_name = '/davanced_programming/chapter13/spam.py'
print( f'base name is: { os. path. basename( file_name)} ')
print( f'dir name is: { os. path. dirname( file_name)} ')
print( f'file split: { os. path. split( file_name)} ')
print( os. path. join( '/new/dir', os. path. basename( file_name)))
print( os. path. expanduser( '~/chapter13/spam.py'))
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

5.2、 Compressed files

If you want to deal with more advanced details , You can use tarfile,zipfile, gzip, bz2 modular ,shutil Just a layer of agent

import
shutil


shutil. unpack_archive( 'py38.zip')
shutil. make_archive( 'py38', 'zip', 'test_zip')

print( shutil. get_archive_formats()) # Output supported file archiving formats
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

5.3、 Find files

import
os


def find_file( start, name):
for rel_path, dirs, files in os. walk( start): #os.walk(start):
if name in files:
full_path = os. path. join( start, rel_path, name)
print( f'full path is: { os. path. normpath( os. path. abspath( full_path))} ') #abspath Fix pathname

if __name__ == '__main__':
find_file( '/advanced_programming/chapter13', 'file_input.py')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
import
os

import time
# Find recently modified files
def modified_within( top, seconds):
now = time. time()
for path, dirs, files in os. walk( top):
for name in files:
full_path = os. path. join( path, name)
if not os. path. exists( full_path):
continue

m_time = os. path. getmtime( full_path)
if m_time > ( now - seconds):
print( f'full path is: { full_path} ')

if __name__ == '__main__':
modified_within( '/advanced_programming/chapter13', float( 1000))
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved